From 9848841ad97db7db16cafcfeb40953bee364d62e Mon Sep 17 00:00:00 2001 From: yacovm Date: Tue, 1 Sep 2020 04:32:11 +0300 Subject: [PATCH] [FAB-18171] Disregard certificate validity period in intra-orderer communication (#1825) This change set makes the orderer cluster authentication infrastructure disregard validity periods when comparing certificates, and only regard public keys. With this change, one can replace the TLS certificate of Raft, by a certificate that has the same public key without issuing channel config updates. This is to ensure that if a certificate of a consenter expired, one can quickly renew it and start the orderer without performing a config update per channel. Change-Id: Id1b29e220d37aa33617b2143b702075bf5b01f6f Signed-off-by: yacovm --- common/crypto/expiration.go | 45 +++++++ common/crypto/expiration_test.go | 117 +++++++++++++++++ integration/e2e/cft_test.go | 111 ++++++++++++++++- orderer/common/cluster/comm.go | 20 +-- orderer/common/cluster/comm_test.go | 6 + orderer/common/cluster/connections.go | 8 +- orderer/common/cluster/util.go | 124 +++++++++++++++++-- orderer/common/cluster/util_test.go | 40 ++++++ orderer/common/server/onboarding.go | 5 +- orderer/consensus/etcdraft/chain.go | 7 +- orderer/consensus/etcdraft/consenter.go | 17 ++- orderer/consensus/etcdraft/consenter_test.go | 19 ++- orderer/consensus/etcdraft/util.go | 42 ++++++- orderer/consensus/etcdraft/util_test.go | 27 +++- 14 files changed, 545 insertions(+), 43 deletions(-) diff --git a/common/crypto/expiration.go b/common/crypto/expiration.go index 638dcd2a61a..6eb3a93c6ff 100644 --- a/common/crypto/expiration.go +++ b/common/crypto/expiration.go @@ -7,8 +7,10 @@ SPDX-License-Identifier: Apache-2.0 package crypto import ( + "bytes" "crypto/x509" "encoding/pem" + "errors" "time" "github.com/golang/protobuf/proto" @@ -98,3 +100,46 @@ func trackCertExpiration(rawCert []byte, certRole string, info MessageFunc, warn }) } + +var ( + // ErrPubKeyMismatch is used by CertificatesWithSamePublicKey to indicate the two public keys mismatch + ErrPubKeyMismatch = errors.New("public keys do not match") +) + +// LogNonPubKeyMismatchErr logs an error which is not an ErrPubKeyMismatch error +func LogNonPubKeyMismatchErr(log func(template string, args ...interface{}), err error, cert1DER, cert2DER []byte) { + cert1PEM := &pem.Block{Type: "CERTIFICATE", Bytes: cert1DER} + cert2PEM := &pem.Block{Type: "CERTIFICATE", Bytes: cert2DER} + log("Failed determining if public key of %s matches public key of %s: %s", + string(pem.EncodeToMemory(cert1PEM)), + string(pem.EncodeToMemory(cert2PEM)), + err) +} + +// CertificatesWithSamePublicKey returns nil if both byte slices +// are valid DER encoding of certificates with the same public key. +func CertificatesWithSamePublicKey(der1, der2 []byte) error { + cert1canonized, err := publicKeyFromCertificate(der1) + if err != nil { + return err + } + + cert2canonized, err := publicKeyFromCertificate(der2) + if err != nil { + return err + } + + if bytes.Equal(cert1canonized, cert2canonized) { + return nil + } + return ErrPubKeyMismatch +} + +// publicKeyFromCertificate returns the public key of the given ASN1 DER certificate. +func publicKeyFromCertificate(der []byte) ([]byte, error) { + cert, err := x509.ParseCertificate(der) + if err != nil { + return nil, err + } + return x509.MarshalPKIXPublicKey(cert.PublicKey) +} diff --git a/common/crypto/expiration_test.go b/common/crypto/expiration_test.go index dbdbe9c115f..83b5c6b6bf6 100644 --- a/common/crypto/expiration_test.go +++ b/common/crypto/expiration_test.go @@ -7,8 +7,10 @@ SPDX-License-Identifier: Apache-2.0 package crypto_test import ( + "bytes" "crypto/x509" "encoding/pem" + "errors" "fmt" "io/ioutil" "path/filepath" @@ -213,3 +215,118 @@ func TestTrackExpiration(t *testing.T) { }) } } + +func TestLogNonPubKeyMismatchErr(t *testing.T) { + ca, err := tlsgen.NewCA() + require.NoError(t, err) + + aliceKeyPair, err := ca.NewClientCertKeyPair() + require.NoError(t, err) + + bobKeyPair, err := ca.NewClientCertKeyPair() + require.NoError(t, err) + + expected := &bytes.Buffer{} + expected.WriteString(fmt.Sprintf("Failed determining if public key of %s matches public key of %s: foo", + string(aliceKeyPair.Cert), + string(bobKeyPair.Cert))) + + b := &bytes.Buffer{} + f := func(template string, args ...interface{}) { + fmt.Fprintf(b, template, args...) + } + + crypto.LogNonPubKeyMismatchErr(f, errors.New("foo"), aliceKeyPair.TLSCert.Raw, bobKeyPair.TLSCert.Raw) + + require.Equal(t, expected.String(), b.String()) +} + +func TestCertificatesWithSamePublicKey(t *testing.T) { + ca, err := tlsgen.NewCA() + require.NoError(t, err) + + bobKeyPair, err := ca.NewClientCertKeyPair() + require.NoError(t, err) + + bobCert := bobKeyPair.Cert + bob := pem2der(bobCert) + + aliceCert := `-----BEGIN CERTIFICATE----- +MIIBNjCB3KADAgECAgELMAoGCCqGSM49BAMCMBAxDjAMBgNVBAUTBUFsaWNlMB4X +DTIwMDgxODIxMzU1NFoXDTIwMDgyMDIxMzU1NFowEDEOMAwGA1UEBRMFQWxpY2Uw +WTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQjZP5VD/RaczoPFbA4gkt1qb54R6SP +J/V5oxkhDboG9xWi0wpyghaMGwwxC7Q9wegEnyOVp9nXoLrQ8LUJ5BfZoycwJTAO +BgNVHQ8BAf8EBAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwIwCgYIKoZIzj0EAwID +SQAwRgIhAK4le5XgH5edyhaQ9Sz7sFz3Zc4bbhPAzt9zQUYnoqK+AiEA5zcyLB/4 +Oqe93lroE6GF9W7UoCZFzD7lXsWku/dgFOU= +-----END CERTIFICATE-----` + + reIssuedAliceCert := `-----BEGIN CERTIFICATE----- +MIIBNDCB3KADAgECAgELMAoGCCqGSM49BAMCMBAxDjAMBgNVBAUTBUFsaWNlMB4X +DTIwMDgxODIxMzY1NFoXDTIwMDgyMDIxMzY1NFowEDEOMAwGA1UEBRMFQWxpY2Uw +WTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQjZP5VD/RaczoPFbA4gkt1qb54R6SP +J/V5oxkhDboG9xWi0wpyghaMGwwxC7Q9wegEnyOVp9nXoLrQ8LUJ5BfZoycwJTAO +BgNVHQ8BAf8EBAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwIwCgYIKoZIzj0EAwID +RwAwRAIgDc8WyXFvsxCk97KS7D/LdYJxMpDKdHNFqpzJT9LddlsCIEr8KcMd/t5p +cRv6rqxvy5M+t0DhRtiwCen70YCUsksb +-----END CERTIFICATE-----` + + alice := pem2der([]byte(aliceCert)) + aliceMakesComeback := pem2der([]byte(reIssuedAliceCert)) + + for _, test := range []struct { + description string + errContains string + first []byte + second []byte + }{ + { + description: "Bad first certificate", + errContains: "asn1:", + first: []byte{1, 2, 3}, + second: bob, + }, + + { + description: "Bad second certificate", + errContains: "asn1:", + first: alice, + second: []byte{1, 2, 3}, + }, + + { + description: "Different certificate", + errContains: crypto.ErrPubKeyMismatch.Error(), + first: alice, + second: bob, + }, + + { + description: "Same certificate", + first: alice, + second: alice, + }, + + { + description: "Same certificate but different validity period", + first: alice, + second: aliceMakesComeback, + }, + } { + t.Run(test.description, func(t *testing.T) { + err := crypto.CertificatesWithSamePublicKey(test.first, test.second) + if test.errContains != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.errContains) + return + } + + require.NoError(t, err) + }) + } +} + +func pem2der(p []byte) []byte { + b, _ := pem.Decode(p) + return b.Bytes +} diff --git a/integration/e2e/cft_test.go b/integration/e2e/cft_test.go index 768a08d08fa..8b53ad25871 100644 --- a/integration/e2e/cft_test.go +++ b/integration/e2e/cft_test.go @@ -398,7 +398,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() { By("Expiring orderer TLS certificates") for filePath, certPEM := range serverTLSCerts { - expiredCert, earlyMadeCACert := expireCertificate(certPEM, ordererTLSCACert, ordererTLSCAKey) + expiredCert, earlyMadeCACert := expireCertificate(certPEM, ordererTLSCACert, ordererTLSCAKey, time.Now()) err = ioutil.WriteFile(filePath, expiredCert, 600) Expect(err).NotTo(HaveOccurred()) @@ -530,6 +530,82 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() { By("Waiting for a leader to be elected") findLeader([]*ginkgomon.Runner{o1Runner, o2Runner, o3Runner}) }) + + It("disregards certificate renewal if only the validity period changed", func() { + config := nwo.MultiNodeEtcdRaft() + config.Channels = append(config.Channels, &nwo.Channel{Name: "foo", Profile: "TwoOrgsChannel"}) + config.Channels = append(config.Channels, &nwo.Channel{Name: "bar", Profile: "TwoOrgsChannel"}) + network = nwo.New(config, testDir, client, 33000, components) + + network.GenerateConfigTree() + network.Bootstrap() + + peer = network.Peer("Org1", "peer0") + + o1 := network.Orderer("orderer1") + o2 := network.Orderer("orderer2") + o3 := network.Orderer("orderer3") + + orderers := []*nwo.Orderer{o1, o2, o3} + + o1Runner := network.OrdererRunner(o1) + o2Runner := network.OrdererRunner(o2) + o3Runner := network.OrdererRunner(o3) + ordererRunners := []*ginkgomon.Runner{o1Runner, o2Runner, o3Runner} + + o1Proc = ifrit.Invoke(o1Runner) + o2Proc = ifrit.Invoke(o2Runner) + o3Proc = ifrit.Invoke(o3Runner) + ordererProcesses := []ifrit.Process{o1Proc, o2Proc, o3Proc} + + Eventually(o1Proc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + Eventually(o2Proc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + Eventually(o3Proc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + + By("Waiting for them to elect a leader") + findLeader(ordererRunners) + + By("Creating a channel") + network.CreateChannel("foo", o1, peer) + + assertBlockReception(map[string]int{ + "foo": 0, + "systemchannel": 1, + }, []*nwo.Orderer{o1, o2, o3}, peer, network) + + By("Killing all orderers") + for i := range orderers { + ordererProcesses[i].Signal(syscall.SIGTERM) + Eventually(ordererProcesses[i].Wait(), network.EventuallyTimeout).Should(Receive()) + } + + By("Renewing the certificates for all orderers") + renewOrdererCertificates(network, o1, o2, o3) + + By("Starting the orderers again") + for i := range orderers { + ordererRunner := network.OrdererRunner(orderers[i]) + ordererRunners[i] = ordererRunner + ordererProcesses[i] = ifrit.Invoke(ordererRunner) + Eventually(ordererProcesses[0].Ready(), network.EventuallyTimeout).Should(BeClosed()) + } + + o1Proc = ordererProcesses[0] + o2Proc = ordererProcesses[1] + o3Proc = ordererProcesses[2] + + By("Waiting for them to elect a leader once again") + findLeader(ordererRunners) + + By("Creating a channel again") + network.CreateChannel("bar", o1, peer) + + assertBlockReception(map[string]int{ + "foo": 0, + "bar": 0, + "systemchannel": 2, + }, []*nwo.Orderer{o1, o2, o3}, peer, network) + }) }) When("admin certificate expires", func() { @@ -560,7 +636,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() { originalAdminCert, err := ioutil.ReadFile(adminCertPath) Expect(err).NotTo(HaveOccurred()) - expiredAdminCert, earlyCACert := expireCertificate(originalAdminCert, ordererCACert, ordererCAKey) + expiredAdminCert, earlyCACert := expireCertificate(originalAdminCert, ordererCACert, ordererCAKey, time.Now()) err = ioutil.WriteFile(adminCertPath, expiredAdminCert, 600) Expect(err).NotTo(HaveOccurred()) @@ -741,7 +817,34 @@ func findLeader(ordererRunners []*ginkgomon.Runner) int { return firstLeader } -func expireCertificate(certPEM, caCertPEM, caKeyPEM []byte) (expiredcertPEM []byte, earlyMadeCACertPEM []byte) { +func renewOrdererCertificates(network *nwo.Network, o1, o2, o3 *nwo.Orderer) { + ordererDomain := network.Organization(o1.Organization).Domain + ordererTLSCACertPath := filepath.Join(network.RootDir, "crypto", "ordererOrganizations", + ordererDomain, "tlsca", fmt.Sprintf("tlsca.%s-cert.pem", ordererDomain)) + ordererTLSCACert, err := ioutil.ReadFile(ordererTLSCACertPath) + Expect(err).NotTo(HaveOccurred()) + + ordererTLSCAKeyPath := filepath.Join(network.RootDir, "crypto", "ordererOrganizations", + ordererDomain, "tlsca", privateKeyFileName(ordererTLSCACert)) + + ordererTLSCAKey, err := ioutil.ReadFile(ordererTLSCAKeyPath) + Expect(err).NotTo(HaveOccurred()) + + serverTLSCerts := make(map[string][]byte) + for _, orderer := range []*nwo.Orderer{o1, o2, o3} { + tlsCertPath := filepath.Join(network.OrdererLocalTLSDir(orderer), "server.crt") + serverTLSCerts[tlsCertPath], err = ioutil.ReadFile(tlsCertPath) + Expect(err).NotTo(HaveOccurred()) + } + + for filePath, certPEM := range serverTLSCerts { + renewedCert, _ := expireCertificate(certPEM, ordererTLSCACert, ordererTLSCAKey, time.Now().Add(time.Hour)) + err = ioutil.WriteFile(filePath, renewedCert, 0600) + Expect(err).NotTo(HaveOccurred()) + } +} + +func expireCertificate(certPEM, caCertPEM, caKeyPEM []byte, expirationTime time.Time) (expiredcertPEM []byte, earlyMadeCACertPEM []byte) { keyAsDER, _ := pem.Decode(caKeyPEM) caKeyWithoutType, err := x509.ParsePKCS8PrivateKey(keyAsDER.Bytes) Expect(err).NotTo(HaveOccurred()) @@ -762,7 +865,7 @@ func expireCertificate(certPEM, caCertPEM, caKeyPEM []byte) (expiredcertPEM []by // As well as the CA certificate caCert.NotBefore = time.Now().Add((-1) * time.Hour) // The certificate expires now - cert.NotAfter = time.Now() + cert.NotAfter = expirationTime // The CA signs the certificate certBytes, err := x509.CreateCertificate(rand.Reader, cert, caCert, cert.PublicKey, caKey) diff --git a/orderer/common/cluster/comm.go b/orderer/common/cluster/comm.go index 9890566a9a7..5584635dd39 100644 --- a/orderer/common/cluster/comm.go +++ b/orderer/common/cluster/comm.go @@ -104,6 +104,7 @@ type Comm struct { Connections *ConnectionStore Chan2Members MembersByChannel Metrics *Metrics + CompareCertificate CertificateComparator } type requestContext struct { @@ -231,9 +232,9 @@ func (c *Comm) Shutdown() { c.shutdown = true for _, members := range c.Chan2Members { - for _, member := range members { - c.Connections.Disconnect(member.ServerTLSCert) - } + members.Foreach(func(id uint64, stub *Stub) { + c.Connections.Disconnect(stub.ServerTLSCert) + }) } } @@ -272,15 +273,15 @@ func (c *Comm) applyMembershipConfig(channel string, newNodes []RemoteNode) { // Remove all stubs without a corresponding node // in the new nodes - for id, stub := range mapping { + mapping.Foreach(func(id uint64, stub *Stub) { if _, exists := newNodeIDs[id]; exists { c.Logger.Info(id, "exists in both old and new membership for channel", channel, ", skipping its deactivation") - continue + return } c.Logger.Info("Deactivated node", id, "who's endpoint is", stub.Endpoint, "as it's removed from membership") - delete(mapping, id) + mapping.Remove(id) stub.Deactivate() - } + }) } // updateStubInMapping updates the given RemoteNode and adds it to the MemberMapping @@ -374,7 +375,10 @@ func (c *Comm) getOrCreateMapping(channel string) MemberMapping { // Lazily create a mapping if it doesn't already exist mapping, exists := c.Chan2Members[channel] if !exists { - mapping = make(MemberMapping) + mapping = MemberMapping{ + id2stub: make(map[uint64]*Stub), + SamePublicKey: c.CompareCertificate, + } c.Chan2Members[channel] = mapping } return mapping diff --git a/orderer/common/cluster/comm_test.go b/orderer/common/cluster/comm_test.go index e4841f4e266..a0ff62d213f 100644 --- a/orderer/common/cluster/comm_test.go +++ b/orderer/common/cluster/comm_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/common/crypto" "github.com/hyperledger/fabric/common/crypto/tlsgen" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/metrics" @@ -266,6 +267,10 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo tstSrv.freezeCond.L = &tstSrv.lock + compareCert := cluster.CachePublicKeyComparisons(func(a, b []byte) bool { + return crypto.CertificatesWithSamePublicKey(a, b) == nil + }) + tstSrv.c = &cluster.Comm{ CertExpWarningThreshold: time.Hour, SendBufferSize: 1, @@ -275,6 +280,7 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo ChanExt: channelExtractor, Connections: cluster.NewConnectionStore(dialer, tlsConnGauge), Metrics: cluster.NewMetrics(metrics), + CompareCertificate: compareCert, } orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv) diff --git a/orderer/common/cluster/connections.go b/orderer/common/cluster/connections.go index e09245ee9c4..f75828dde94 100644 --- a/orderer/common/cluster/connections.go +++ b/orderer/common/cluster/connections.go @@ -7,11 +7,11 @@ SPDX-License-Identifier: Apache-2.0 package cluster import ( - "bytes" "crypto/x509" "sync" "sync/atomic" + "github.com/hyperledger/fabric/common/crypto" "github.com/hyperledger/fabric/common/metrics" "github.com/pkg/errors" "google.golang.org/grpc" @@ -59,10 +59,12 @@ func NewConnectionStore(dialer SecureDialer, tlsConnectionCount metrics.Gauge) * // itself with the given TLS certificate func (c *ConnectionStore) verifyHandshake(endpoint string, certificate []byte) RemoteVerifier { return func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error { - if bytes.Equal(certificate, rawCerts[0]) { + err := crypto.CertificatesWithSamePublicKey(certificate, rawCerts[0]) + if err == nil { return nil } - return errors.Errorf("certificate presented by %s doesn't match any authorized certificate", endpoint) + return errors.Errorf("public key of server certificate presented by %s doesn't match the expected public key", + endpoint) } } diff --git a/orderer/common/cluster/util.go b/orderer/common/cluster/util.go index d08d3e09c6d..8c9c7c3e081 100644 --- a/orderer/common/cluster/util.go +++ b/orderer/common/cluster/util.go @@ -12,6 +12,7 @@ import ( "crypto/x509" "encoding/hex" "encoding/pem" + "math/rand" "sync" "sync/atomic" "time" @@ -50,27 +51,47 @@ func (cbc ConnByCertMap) Remove(cert []byte) { delete(cbc, string(cert)) } +// Size returns the size of the connections by certificate mapping func (cbc ConnByCertMap) Size() int { return len(cbc) } +// CertificateComparator returns whether some relation holds for two given certificates +type CertificateComparator func([]byte, []byte) bool + // MemberMapping defines NetworkMembers by their ID -type MemberMapping map[uint64]*Stub +// and enables to lookup stubs by a certificate +type MemberMapping struct { + id2stub map[uint64]*Stub + SamePublicKey CertificateComparator +} + +// Foreach applies the given function on all stubs in the mapping +func (mp *MemberMapping) Foreach(f func(id uint64, stub *Stub)) { + for id, stub := range mp.id2stub { + f(id, stub) + } +} // Put inserts the given stub to the MemberMapping -func (mp MemberMapping) Put(stub *Stub) { - mp[stub.ID] = stub +func (mp *MemberMapping) Put(stub *Stub) { + mp.id2stub[stub.ID] = stub +} + +// Remove removes the stub with the given ID from the MemberMapping +func (mp *MemberMapping) Remove(ID uint64) { + delete(mp.id2stub, ID) } // ByID retrieves the Stub with the given ID from the MemberMapping func (mp MemberMapping) ByID(ID uint64) *Stub { - return mp[ID] + return mp.id2stub[ID] } // LookupByClientCert retrieves a Stub with the given client certificate func (mp MemberMapping) LookupByClientCert(cert []byte) *Stub { - for _, stub := range mp { - if bytes.Equal(stub.ClientTLSCert, cert) { + for _, stub := range mp.id2stub { + if mp.SamePublicKey(stub.ClientTLSCert, cert) { return stub } } @@ -81,7 +102,7 @@ func (mp MemberMapping) LookupByClientCert(cert []byte) *Stub { // represented as strings func (mp MemberMapping) ServerCertificates() StringSet { res := make(StringSet) - for _, member := range mp { + for _, member := range mp.id2stub { res[string(member.ServerTLSCert)] = struct{}{} } return res @@ -660,3 +681,92 @@ func (exp *certificateExpirationCheck) checkExpiration(currentTime time.Time, ch exp.nodeName, exp.endpoint, channel, timeLeft) exp.lastWarning = currentTime } + +// CachePublicKeyComparisons creates CertificateComparator that caches invocations based on input arguments. +// The given CertificateComparator must be a stateless function. +func CachePublicKeyComparisons(f CertificateComparator) CertificateComparator { + m := &ComparisonMemoizer{ + MaxEntries: 4096, + F: f, + } + return m.Compare +} + +// ComparisonMemoizer speeds up comparison computations by caching past invocations of a stateless function +type ComparisonMemoizer struct { + // Configuration + F func(a, b []byte) bool + MaxEntries uint16 + // Internal state + cache map[arguments]bool + lock sync.RWMutex + once sync.Once + rand *rand.Rand +} + +type arguments struct { + a, b string +} + +// Size returns the number of computations the ComparisonMemoizer currently caches. +func (cm *ComparisonMemoizer) Size() int { + cm.lock.RLock() + defer cm.lock.RUnlock() + return len(cm.cache) +} + +// Compare compares the given two byte slices. +// It may return previous computations for the given two arguments, +// otherwise it will compute the function F and cache the result. +func (cm *ComparisonMemoizer) Compare(a, b []byte) bool { + cm.once.Do(cm.setup) + key := arguments{ + a: string(a), + b: string(b), + } + + cm.lock.RLock() + result, exists := cm.cache[key] + cm.lock.RUnlock() + + if exists { + return result + } + + result = cm.F(a, b) + + cm.lock.Lock() + defer cm.lock.Unlock() + + cm.shrinkIfNeeded() + cm.cache[key] = result + + return result +} + +func (cm *ComparisonMemoizer) shrinkIfNeeded() { + for { + currentSize := uint16(len(cm.cache)) + if currentSize < cm.MaxEntries { + return + } + cm.shrink() + } +} + +func (cm *ComparisonMemoizer) shrink() { + // Shrink the cache by 25% by removing every fourth element (on average) + for key := range cm.cache { + if cm.rand.Int()%4 != 0 { + continue + } + delete(cm.cache, key) + } +} + +func (cm *ComparisonMemoizer) setup() { + cm.lock.Lock() + defer cm.lock.Unlock() + cm.rand = rand.New(rand.NewSource(time.Now().UnixNano())) + cm.cache = make(map[arguments]bool) +} diff --git a/orderer/common/cluster/util_test.go b/orderer/common/cluster/util_test.go index 9c3a3dac8a6..9d7c3ab0f12 100644 --- a/orderer/common/cluster/util_test.go +++ b/orderer/common/cluster/util_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package cluster_test import ( + "bytes" "crypto/x509" "encoding/pem" "errors" @@ -36,6 +37,7 @@ import ( "github.com/hyperledger/fabric/protos/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -1153,3 +1155,41 @@ func injectAdditionalTLSCAEndpointPair(t *testing.T, block *common.Block, endpoi env.Payload = utils.MarshalOrPanic(payload) block.Data.Data[0] = utils.MarshalOrPanic(env) } + +func TestComparisonMemoizer(t *testing.T) { + var invocations int + + m := &cluster.ComparisonMemoizer{ + MaxEntries: 5, + F: func(a, b []byte) bool { + invocations++ + return bytes.Equal(a, b) + }, + } + + // Warm-up cache + for i := 0; i < 5; i++ { + notSame := m.Compare([]byte{byte(i)}, []byte{1, 2, 3}) + require.False(t, notSame) + require.Equal(t, i+1, invocations) + } + + // Ensure lookups are cached + for i := 0; i < 5; i++ { + notSame := m.Compare([]byte{byte(i)}, []byte{1, 2, 3}) + require.False(t, notSame) + require.Equal(t, 5, invocations) + } + + // Put a new entry which will cause a cache miss + same := m.Compare([]byte{5}, []byte{5}) + require.True(t, same) + require.Equal(t, 6, invocations) + + // Keep adding more and more elements to the cache and ensure it stays smaller than its size + for i := 0; i < 20; i++ { + odd := m.Compare([]byte{byte(1)}, []byte{byte(i % 2)}) + require.Equal(t, i%2 != 0, odd) + require.True(t, m.Size() <= int(m.MaxEntries)) + } +} diff --git a/orderer/common/server/onboarding.go b/orderer/common/server/onboarding.go index ed8f2ddb1e6..be11c8be7f5 100644 --- a/orderer/common/server/onboarding.go +++ b/orderer/common/server/onboarding.go @@ -49,7 +49,10 @@ func (ri *replicationInitiator) replicateIfNeeded(bootstrapBlock *common.Block) } func (ri *replicationInitiator) createReplicator(bootstrapBlock *common.Block, filter func(string) bool) *cluster.Replicator { - consenterCert := etcdraft.ConsenterCertificate(ri.secOpts.Certificate) + consenterCert := &etcdraft.ConsenterCertificate{ + Logger: ri.logger, + ConsenterCertificate: ri.secOpts.Certificate, + } systemChannelName, err := utils.GetChainIDFromBlock(bootstrapBlock) if err != nil { ri.logger.Panicf("Failed extracting system channel name from bootstrap block: %v", err) diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index 42580f93fa6..f03e5d0b35b 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -1328,8 +1328,13 @@ func (c *Chain) suspectEviction() bool { } func (c *Chain) newEvictionSuspector() *evictionSuspector { + consenterCertificate := &ConsenterCertificate{ + Logger: c.logger, + ConsenterCertificate: c.opts.Cert, + } + return &evictionSuspector{ - amIInChannel: ConsenterCertificate(c.opts.Cert).IsConsenterOfChannel, + amIInChannel: consenterCertificate.IsConsenterOfChannel, evictionSuspicionThreshold: c.opts.EvictionSuspicion, writeBlock: c.support.Append, createPuller: c.createPuller, diff --git a/orderer/consensus/etcdraft/consenter.go b/orderer/consensus/etcdraft/consenter.go index 4851ff85de9..806ee506061 100644 --- a/orderer/consensus/etcdraft/consenter.go +++ b/orderer/consensus/etcdraft/consenter.go @@ -7,13 +7,13 @@ SPDX-License-Identifier: Apache-2.0 package etcdraft import ( - "bytes" "path" "reflect" "time" "code.cloudfoundry.org/clock" "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/common/crypto" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/metrics" "github.com/hyperledger/fabric/common/viperutil" @@ -116,7 +116,7 @@ func (c *Consenter) detectSelfID(consenters map[uint64]*etcdraft.Consenter) (uin return 0, err } - if bytes.Equal(thisNodeCertAsDER, certAsDER) { + if crypto.CertificatesWithSamePublicKey(thisNodeCertAsDER, certAsDER) == nil { return nodeID, nil } } @@ -301,16 +301,27 @@ func New( func createComm(clusterDialer *cluster.PredicateDialer, c *Consenter, config localconfig.Cluster, p metrics.Provider) *cluster.Comm { metrics := cluster.NewMetrics(p) + logger := flogging.MustGetLogger("orderer.common.cluster") + + compareCert := cluster.CachePublicKeyComparisons(func(a, b []byte) bool { + err := crypto.CertificatesWithSamePublicKey(a, b) + if err != nil && err != crypto.ErrPubKeyMismatch { + crypto.LogNonPubKeyMismatchErr(logger.Errorf, err, a, b) + } + return err == nil + }) + comm := &cluster.Comm{ MinimumExpirationWarningInterval: cluster.MinimumExpirationWarningInterval, CertExpWarningThreshold: config.CertExpirationWarningThreshold, SendBufferSize: config.SendBufferSize, - Logger: flogging.MustGetLogger("orderer.common.cluster"), + Logger: logger, Chan2Members: make(map[string]cluster.MemberMapping), Connections: cluster.NewConnectionStore(clusterDialer, metrics.EgressTLSConnectionCount), Metrics: metrics, ChanExt: c, H: c, + CompareCertificate: compareCert, } c.Communication = comm return comm diff --git a/orderer/consensus/etcdraft/consenter_test.go b/orderer/consensus/etcdraft/consenter_test.go index d7d5f59ad5c..0227a74c81f 100644 --- a/orderer/consensus/etcdraft/consenter_test.go +++ b/orderer/consensus/etcdraft/consenter_test.go @@ -35,19 +35,27 @@ import ( "go.uber.org/zap/zapcore" ) +var ( + certAsPEM []byte +) + var _ = Describe("Consenter", func() { var ( - certAsPEM []byte chainGetter *mocks.ChainGetter support *consensusmocks.FakeConsenterSupport dataDir string snapDir string walDir string - err error ) BeforeEach(func() { - certAsPEM = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: []byte("cert bytes")}) + ca, err := tlsgen.NewCA() + Expect(err).NotTo(HaveOccurred()) + kp, err := ca.NewClientCertKeyPair() + Expect(err).NotTo(HaveOccurred()) + if certAsPEM == nil { + certAsPEM = kp.Cert + } chainGetter = &mocks.ChainGetter{} support = &consensusmocks.FakeConsenterSupport{} dataDir, err = ioutil.TempDir("", "snap-") @@ -144,8 +152,7 @@ var _ = Describe("Consenter", func() { }) It("successfully constructs a Chain", func() { - // We append a line feed to our cert, just to ensure that we can still consume it and ignore. - certAsPEMWithLineFeed := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: []byte("cert bytes")}) + certAsPEMWithLineFeed := certAsPEM certAsPEMWithLineFeed = append(certAsPEMWithLineFeed, []byte("\n")...) m := &etcdraftproto.ConfigMetadata{ Consenters: []*etcdraftproto.Consenter{ @@ -272,7 +279,7 @@ func newConsenter(chainGetter *mocks.ChainGetter) *consenter { communicator.On("Configure", mock.Anything, mock.Anything) icr := &mocks.InactiveChainRegistry{} icr.On("TrackChain", "foo", mock.Anything, mock.Anything) - certAsPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: []byte("cert bytes")}) + c := &etcdraft.Consenter{ InactiveChainRegistry: icr, Communication: communicator, diff --git a/orderer/consensus/etcdraft/util.go b/orderer/consensus/etcdraft/util.go index e9685c3b8bc..bcbc5cd7e27 100644 --- a/orderer/consensus/etcdraft/util.go +++ b/orderer/consensus/etcdraft/util.go @@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0 package etcdraft import ( - "bytes" "crypto/x509" "encoding/pem" "fmt" @@ -18,6 +17,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/common/channelconfig" "github.com/hyperledger/fabric/common/configtx" + "github.com/hyperledger/fabric/common/crypto" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/orderer/common/cluster" "github.com/hyperledger/fabric/orderer/common/localconfig" @@ -444,14 +444,19 @@ func validateCert(pemData []byte, certRole string) error { } // ConsenterCertificate denotes a TLS certificate of a consenter -type ConsenterCertificate []byte +type ConsenterCertificate struct { + ConsenterCertificate []byte + Logger *flogging.FabricLogger +} + +// type ConsenterCertificate []byte // IsConsenterOfChannel returns whether the caller is a consenter of a channel // by inspecting the given configuration block. // It returns nil if true, else returns an error. func (conCert ConsenterCertificate) IsConsenterOfChannel(configBlock *common.Block) error { - if configBlock == nil { - return errors.New("nil block") + if configBlock == nil || configBlock.Header == nil { + return errors.New("nil block or nil header") } envelopeConfig, err := utils.ExtractEnvelope(configBlock, 0) if err != nil { @@ -470,11 +475,38 @@ func (conCert ConsenterCertificate) IsConsenterOfChannel(configBlock *common.Blo return err } + bl, _ := pem.Decode(conCert.ConsenterCertificate) + if bl == nil { + return errors.Errorf("my consenter certificate %s is not a valid PEM", string(conCert.ConsenterCertificate)) + } + + myCertDER := bl.Bytes + + var failedMatches []string for _, consenter := range m.Consenters { - if bytes.Equal(conCert, consenter.ServerTlsCert) || bytes.Equal(conCert, consenter.ClientTlsCert) { + candidateBlock, _ := pem.Decode(consenter.ServerTlsCert) + if candidateBlock == nil { + return errors.Errorf("candidate server certificate %s is not a valid PEM", string(consenter.ServerTlsCert)) + } + sameServerCertErr := crypto.CertificatesWithSamePublicKey(myCertDER, candidateBlock.Bytes) + + candidateBlock, _ = pem.Decode(consenter.ClientTlsCert) + if candidateBlock == nil { + return errors.Errorf("candidate client certificate %s is not a valid PEM", string(consenter.ClientTlsCert)) + } + sameClientCertErr := crypto.CertificatesWithSamePublicKey(myCertDER, candidateBlock.Bytes) + + if sameServerCertErr == nil || sameClientCertErr == nil { return nil } + conCert.Logger.Debugf("I am not %s:%d because %s, %s", consenter.Host, consenter.Port, sameServerCertErr, sameClientCertErr) + failedMatches = append(failedMatches, string(consenter.ClientTlsCert)) } + conCert.Logger.Debugf("Failed matching our certificate %s against certificates encoded in config block %d: %v", + string(conCert.ConsenterCertificate), + configBlock.Header.Number, + failedMatches) + return cluster.ErrNotInChannel } diff --git a/orderer/consensus/etcdraft/util_test.go b/orderer/consensus/etcdraft/util_test.go index 1e8f60631ba..cfe17cad69a 100644 --- a/orderer/consensus/etcdraft/util_test.go +++ b/orderer/consensus/etcdraft/util_test.go @@ -30,6 +30,7 @@ import ( "github.com/onsi/gomega" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/raftpb" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -74,7 +75,13 @@ func TestIsConsenterOfChannel(t *testing.T) { "BQUFBQUFBQUFBQUFBQUFBQUFBQUFCTUFvR0NDcUdTTTQ5QkFNQ0EwY0FNRVFDCklFckJZRFVzV0JwOHB0ZVFSaTZyNjNVelhJQi81Sn" + "YxK0RlTkRIUHc3aDljQWlCakYrM3V5TzBvMEdRclB4MEUKUWptYlI5T3BVREN2LzlEUkNXWU9GZitkVlE9PQotLS0tLUVORCBDRVJUSU" + "ZJQ0FURS0tLS0tCg==") - assert.NoError(t, err) + require.NoError(t, err) + + ca, err := tlsgen.NewCA() + require.NoError(t, err) + + kp, err := ca.NewClientCertKeyPair() + require.NoError(t, err) validBlock := func() *common.Block { b, err := ioutil.ReadFile(filepath.Join("testdata", "etcdraftgenesis.block")) @@ -92,18 +99,24 @@ func TestIsConsenterOfChannel(t *testing.T) { }{ { name: "nil block", - expectedError: "nil block", + expectedError: "nil block or nil header", + }, + { + name: "nil header", + expectedError: "nil block or nil header", + configBlock: &common.Block{}, }, { name: "no block data", expectedError: "block data is nil", - configBlock: &common.Block{}, + configBlock: &common.Block{Header: &common.BlockHeader{}}, }, { name: "invalid envelope inside block", expectedError: "failed to unmarshal payload from envelope:" + " error unmarshaling Payload: proto: common.Payload: illegal tag 0 (wire type 1)", configBlock: &common.Block{ + Header: &common.BlockHeader{}, Data: &common.BlockData{ Data: [][]byte{utils.MarshalOrPanic(&common.Envelope{ Payload: []byte{1, 2, 3}, @@ -114,7 +127,7 @@ func TestIsConsenterOfChannel(t *testing.T) { { name: "valid config block with cert mismatch", configBlock: validBlock(), - certificate: certInsideConfigBlock[2:], + certificate: kp.Cert, expectedError: cluster.ErrNotInChannel.Error(), }, { @@ -124,7 +137,11 @@ func TestIsConsenterOfChannel(t *testing.T) { }, } { t.Run(testCase.name, func(t *testing.T) { - err := ConsenterCertificate(testCase.certificate).IsConsenterOfChannel(testCase.configBlock) + consenterCertificate := &ConsenterCertificate{ + Logger: flogging.MustGetLogger("test"), + ConsenterCertificate: testCase.certificate, + } + err = consenterCertificate.IsConsenterOfChannel(testCase.configBlock) if testCase.expectedError != "" { assert.EqualError(t, err, testCase.expectedError) } else {