From 4579ed14bc98d3448affb824c81cda7ce8b8695f Mon Sep 17 00:00:00 2001 From: YACOVM Date: Mon, 27 Feb 2017 16:22:31 +0200 Subject: [PATCH] [FAB-2007] Gossip: External and internal endpoints I Intro: An organization might want to publish external endpoints for other organizations, but use internal endpoints (intranet) for communication between peers inside the organization. At the same time, an organization might not want to leak information about its internal addresses to other organizations. A peer has 2 endpoints when it is configured: 1) Internal endpoint (exists anyway) 2) External endpoint (might be configured) Only peers that have an external endpoint configured are supposed to be visible to peers outside the organization. What's in this commit? This commit addresses this deal in the discovery layer: When a membership request message reaches a peer, it grabs all alive messages it posseses and sends them to the remote peer in a membership response message. Both messages are point-to-point (not "gossiped"/broadcasted). And need to be created in such a way to: 1) Not tell about peers that have no external endpoint 2) Not leak internal endpoints to peers outside the org This commit adds a policy to the discovery layer that enables: 1) Filter (Sieve): Only to include peers that hold some criteria in the membership response message. 2) Message mutator (Disjoiner): removes fields of the messages sent to remote peers that shouldn't be exposed to the remote peer. How is it tested? I wrote a test that simulates 2 organizations, and a disclosure policy that fits what is going to be done in the next commit in the gossip layer (the layer above). The test checks conditions (1) and (2). Signed-off-by: Yacov Manevich Change-Id: Iade3d32b0d2a58400734b76c30189474c001718b --- gossip/discovery/discovery.go | 20 +++ gossip/discovery/discovery_impl.go | 104 ++++++++------- gossip/discovery/discovery_test.go | 201 +++++++++++++++++++++++++++-- gossip/gossip/gossip_impl.go | 14 +- 4 files changed, 281 insertions(+), 58 deletions(-) diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go index aa383aed146..e30305e6a21 100644 --- a/gossip/discovery/discovery.go +++ b/gossip/discovery/discovery.go @@ -30,6 +30,26 @@ type CryptoService interface { SignMessage(m *proto.GossipMessage, internalEndpoint string) *proto.Envelope } +// EnvelopeFilter may or may not remove part of the Envelope +// that the given SignedGossipMessage originates from. +type EnvelopeFilter func(message *proto.SignedGossipMessage) *proto.Envelope + +// Sieve defines the messages that are allowed to be sent to some remote peer, +// based on some criteria. +// Returns whether the sieve permits sending a given message. +type Sieve func(message *proto.SignedGossipMessage) bool + +// DisclosurePolicy defines which messages a given remote peer +// is eligible of knowing about, and also what is it eligible +// to know about out of a given SignedGossipMessage. +// Returns: +// 1) A Sieve for a given remote peer. +// The Sieve is applied for each peer in question and outputs +// whether the message should be disclosed to the remote peer. +// 2) A EnvelopeFilter for a given SignedGossipMessage, which may remove +// part of the Envelope the SignedGossipMessage originates from +type DisclosurePolicy func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) + // CommService is an interface that the discovery expects to be implemented and passed on creation type CommService interface { // Gossip gossips a message diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index db7f99af2fe..cc56541c074 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -84,28 +84,30 @@ type gossipDiscoveryImpl struct { crypt CryptoService lock *sync.RWMutex - toDieChan chan struct{} - toDieFlag int32 - logger *logging.Logger + toDieChan chan struct{} + toDieFlag int32 + logger *logging.Logger + disclosurePolicy DisclosurePolicy } // NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed -func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService) Discovery { +func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy) Discovery { d := &gossipDiscoveryImpl{ - self: self, - incTime: uint64(time.Now().UnixNano()), - seqNum: uint64(0), - deadLastTS: make(map[string]*timestamp), - aliveLastTS: make(map[string]*timestamp), - id2Member: make(map[string]*NetworkMember), - aliveMembership: util.NewMembershipStore(), - deadMembership: util.NewMembershipStore(), - crypt: crypt, - comm: comm, - lock: &sync.RWMutex{}, - toDieChan: make(chan struct{}, 1), - toDieFlag: int32(0), - logger: util.GetLogger(util.LoggingDiscoveryModule, self.InternalEndpoint), + self: self, + incTime: uint64(time.Now().UnixNano()), + seqNum: uint64(0), + deadLastTS: make(map[string]*timestamp), + aliveLastTS: make(map[string]*timestamp), + id2Member: make(map[string]*NetworkMember), + aliveMembership: util.NewMembershipStore(), + deadMembership: util.NewMembershipStore(), + crypt: crypt, + comm: comm, + lock: &sync.RWMutex{}, + toDieChan: make(chan struct{}, 1), + toDieFlag: int32(0), + logger: util.GetLogger(util.LoggingDiscoveryModule, self.InternalEndpoint), + disclosurePolicy: disPol, } go d.periodicalSendAlive() @@ -307,7 +309,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { // Sending a membership response to a peer may block this routine // in case the sending is deliberately slow (i.e attack). // will keep this async until I'll write a timeout detector in the comm layer - go d.sendMemResponse(selfInfoGossipMsg.GetAliveMsg().Membership, memReq.Known, internalEndpoint) + go d.sendMemResponse(selfInfoGossipMsg.GetAliveMsg().Membership, internalEndpoint) return } @@ -352,19 +354,27 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { } } -func (d *gossipDiscoveryImpl) sendMemResponse(member *proto.Member, known [][]byte, internalEndpoint string) { - d.logger.Debug("Entering", member) +func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *proto.Member, internalEndpoint string) { + d.logger.Debug("Entering", targetMember) - memResp := d.createMembershipResponse(known) + targetPeer := &NetworkMember{ + Endpoint: targetMember.Endpoint, + Metadata: targetMember.Metadata, + PKIid: targetMember.PkiID, + InternalEndpoint: internalEndpoint, + } + + memResp := d.createMembershipResponse(targetPeer) + if memResp == nil { + errMsg := `Got a membership request from a peer that shouldn't have sent one: %v, closing connection to the peer as a result.` + d.logger.Warningf(errMsg, targetMember) + d.comm.CloseConn(targetPeer) + return + } defer d.logger.Debug("Exiting, replying with", memResp) - d.comm.SendToPeer(&NetworkMember{ - Endpoint: member.Endpoint, - Metadata: member.Metadata, - PKIid: member.PkiID, - InternalEndpoint: internalEndpoint, - }, (&proto.GossipMessage{ + d.comm.SendToPeer(targetPeer, (&proto.GossipMessage{ Tag: proto.GossipMessage_EMPTY, Nonce: uint64(0), Content: &proto.GossipMessage_MemRes{ @@ -373,36 +383,37 @@ func (d *gossipDiscoveryImpl) sendMemResponse(member *proto.Member, known [][]by }).NoopSign()) } -func (d *gossipDiscoveryImpl) createMembershipResponse(known [][]byte) *proto.MembershipResponse { +func (d *gossipDiscoveryImpl) createMembershipResponse(targetMember *NetworkMember) *proto.MembershipResponse { + shouldBeDisclosed, omitConcealedFields := d.disclosurePolicy(targetMember) aliveMsg := d.createAliveMessage() + if !shouldBeDisclosed(aliveMsg) { + return nil + } + d.lock.RLock() defer d.lock.RUnlock() deadPeers := []*proto.Envelope{} for _, dm := range d.deadMembership.ToSlice() { - isKnown := false - for _, knownPeer := range known { - if equalPKIid(knownPeer, dm.GetAliveMsg().Membership.PkiID) { - isKnown = true - break - } - } - if !isKnown { - deadPeers = append(deadPeers, dm.Envelope) - break + + if !shouldBeDisclosed(dm) { + continue } + deadPeers = append(deadPeers, omitConcealedFields(dm)) } - aliveMembersAsSlice := d.aliveMembership.ToSlice() - aliveSnapshot := make([]*proto.Envelope, len(aliveMembersAsSlice)) - for i, msg := range aliveMembersAsSlice { - aliveSnapshot[i] = msg.Envelope + var aliveSnapshot []*proto.Envelope + for _, am := range d.aliveMembership.ToSlice() { + if !shouldBeDisclosed(am) { + continue + } + aliveSnapshot = append(aliveSnapshot, omitConcealedFields(am)) } return &proto.MembershipResponse{ - Alive: append(aliveSnapshot, aliveMsg.Envelope), + Alive: append(aliveSnapshot, omitConcealedFields(aliveMsg)), Dead: deadPeers, } } @@ -536,7 +547,10 @@ func (d *gossipDiscoveryImpl) sendMembershipRequest(member *NetworkMember) { func (d *gossipDiscoveryImpl) createMembershipRequest() *proto.SignedGossipMessage { req := &proto.MembershipRequest{ SelfInformation: d.createAliveMessage().Envelope, - Known: d.getKnownPeers(), + // TODO: sending the known peers is not secure because the remote peer might shouldn't know + // TODO: about the known peers. I'm deprecating this until a secure mechanism will be implemented. + // TODO: See FAB-2570 for tracking this issue. + Known: [][]byte{}, } return (&proto.GossipMessage{ Tag: proto.GossipMessage_EMPTY, diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index dd0d4d325d5..46f4f0b6295 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -20,6 +20,8 @@ import ( "fmt" "io" "net" + "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -69,9 +71,12 @@ func (m *gossipMsg) GetGossipMessage() *proto.GossipMessage { type gossipInstance struct { comm *dummyCommModule Discovery - gRGCserv *grpc.Server - lsnr net.Listener - shouldGossip bool + gRGCserv *grpc.Server + lsnr net.Listener + shouldGossip bool + syncInitiator *time.Ticker + stopChan chan struct{} + port int } func (comm *dummyCommModule) ValidateAliveMsg(am *proto.SignedGossipMessage) bool { @@ -174,6 +179,22 @@ func (comm *dummyCommModule) CloseConn(peer *NetworkMember) { comm.conns[peer.Endpoint].Close() } +func (g *gossipInstance) initiateSync(frequency time.Duration, peerNum int) { + g.syncInitiator = time.NewTicker(frequency) + g.stopChan = make(chan struct{}) + go func() { + for { + select { + case <-g.syncInitiator.C: + g.Discovery.InitiateSync(peerNum) + case <-g.stopChan: + g.syncInitiator.Stop() + return + } + } + }() +} + func (g *gossipInstance) GossipStream(stream proto.Gossip_GossipStreamServer) error { for { envelope, err := stream.Recv() @@ -225,6 +246,9 @@ func (g *gossipInstance) tryForwardMessage(msg *proto.SignedGossipMessage) { } func (g *gossipInstance) Stop() { + if g.syncInitiator != nil { + g.stopChan <- struct{}{} + } g.gRGCserv.Stop() g.lsnr.Close() for _, stream := range g.comm.streams { @@ -240,15 +264,27 @@ func (g *gossipInstance) Ping(context.Context, *proto.Empty) (*proto.Empty, erro return &proto.Empty{}, nil } +var noopPolicy = func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) { + return func(msg *proto.SignedGossipMessage) bool { + return true + }, func(message *proto.SignedGossipMessage) *proto.Envelope { + return message.Envelope + } +} + func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *gossipInstance { - return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true) + return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true, noopPolicy) } func createDiscoveryInstanceWithNoGossip(port int, id string, bootstrapPeers []string) *gossipInstance { - return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false) + return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, noopPolicy) } -func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool) *gossipInstance { +func createDiscoveryInstanceWithNoGossipWithDisclosurePolicy(port int, id string, bootstrapPeers []string, pol DisclosurePolicy) *gossipInstance { + return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, pol) +} + +func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy) *gossipInstance { comm := &dummyCommModule{ conns: make(map[string]*grpc.ClientConn), streams: make(map[string]proto.Gossip_GossipStreamClient), @@ -276,8 +312,8 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st } s := grpc.NewServer() - discSvc := NewDiscoveryService(bootstrapPeers, self, comm, comm) - gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll, shouldGossip: shouldGossip} + discSvc := NewDiscoveryService(bootstrapPeers, self, comm, comm, pol) + gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll, shouldGossip: shouldGossip, port: port} proto.RegisterGossipServer(s, gossInst) go s.Serve(ll) @@ -472,7 +508,6 @@ func TestGossipDiscoveryStopping(t *testing.T) { inst := createDiscoveryInstance(9611, "d1", []string{bootPeer(9611)}) time.Sleep(time.Second) waitUntilOrFailBlocking(t, inst.Stop) - } func TestGossipDiscoverySkipConnectingToLocalhostBootstrap(t *testing.T) { @@ -524,6 +559,140 @@ func TestConvergence(t *testing.T) { stopInstances(t, instances) } +func TestDisclosurePolicyWithPull(t *testing.T) { + t.Parallel() + // Scenario: run 2 groups of peers that simulate 2 organizations: + // {p0, p1, p2, p3, p4} + // {p5, p6, p7, p8, p9} + // Only peers that have an even id have external addresses + // and only these peers should be published to peers of the other group, + // while the only ones that need to know about them are peers + // that have an even id themselves. + // Furthermore, peers in different sets, should not know about internal addresses of + // other peers. + + // This is a bootstrap map that matches for each peer its own bootstrap peer. + // In practice (production) peers should only use peers of their orgs as bootstrap peers, + // but the discovery layer is ignorant of organizations. + bootPeerMap := map[int]int{ + 8610: 8616, + 8611: 8610, + 8612: 8610, + 8613: 8610, + 8614: 8610, + 8615: 8616, + 8616: 8610, + 8617: 8616, + 8618: 8616, + 8619: 8616, + } + + // This map matches each peer, the peers it should know about in the test scenario. + peersThatShouldBeKnownToPeers := map[int][]int{ + 8610: {8611, 8612, 8613, 8614, 8616, 8618}, + 8611: {8610, 8612, 8613, 8614}, + 8612: {8610, 8611, 8613, 8614, 8616, 8618}, + 8613: {8610, 8611, 8612, 8614}, + 8614: {8610, 8611, 8612, 8613, 8616, 8618}, + 8615: {8616, 8617, 8618, 8619}, + 8616: {8610, 8612, 8614, 8615, 8617, 8618, 8619}, + 8617: {8615, 8616, 8618, 8619}, + 8618: {8610, 8612, 8614, 8615, 8616, 8617, 8619}, + 8619: {8615, 8616, 8617, 8618}, + } + // Create the peers in the two groups + instances1, instances2 := createDisjointPeerGroupsWithNoGossip(bootPeerMap) + // Sleep a while to let them establish membership. This time should be more than enough + // because the instances are configured to pull membership in very high frequency from + // up to 10 peers (which results in - pulling from everyone) + time.Sleep(time.Second * 5) + for _, inst := range append(instances1, instances2...) { + portsOfKnownMembers := portsOfMembers(inst.GetMembership()) + // Ensure the expected membership is equal to the actual membership + // of each peer. the portsOfMembers returns a sorted slice so assert.Equal does the job. + assert.Equal(t, peersThatShouldBeKnownToPeers[inst.port], portsOfKnownMembers) + // Next, check that internal endpoints aren't leaked across groups, + for _, knownPeer := range inst.GetMembership() { + // If internal endpoint is known, ensure the peers are in the same group + // unless the peer in question is a peer that has a public address. + // We cannot control what we disclose about ourselves when we send a membership request + if len(knownPeer.InternalEndpoint) > 0 && inst.port%2 != 0 { + bothInGroup1 := portOfEndpoint(knownPeer.Endpoint) < 8615 && inst.port < 8615 + bothInGroup2 := portOfEndpoint(knownPeer.Endpoint) >= 8615 && inst.port >= 8615 + assert.True(t, bothInGroup1 || bothInGroup2, "%v knows about %v's internal endpoint", inst.port, knownPeer.InternalEndpoint) + } + } + } + + t.Log("Shutting down instance 0...") + // Now, we shutdown instance 0 and ensure that peers that shouldn't know it, + // do not know it via membership requests + stopInstances(t, []*gossipInstance{instances1[0]}) + time.Sleep(time.Second * 3) + for _, inst := range append(instances1[1:], instances2...) { + if peersThatShouldBeKnownToPeers[inst.port][0] == 8610 { + assert.Equal(t, 1, inst.Discovery.(*gossipDiscoveryImpl).deadMembership.Size()) + } else { + assert.Equal(t, 0, inst.Discovery.(*gossipDiscoveryImpl).deadMembership.Size()) + } + } + stopInstances(t, instances1[1:]) + stopInstances(t, instances2) +} + +func createDisjointPeerGroupsWithNoGossip(bootPeerMap map[int]int) ([]*gossipInstance, []*gossipInstance) { + instances1 := []*gossipInstance{} + instances2 := []*gossipInstance{} + for group := 0; group < 2; group++ { + for i := 0; i < 5; i++ { + group := group + id := fmt.Sprintf("id%d", group*5+i) + port := 8610 + group*5 + i + bootPeers := []string{bootPeer(bootPeerMap[port])} + pol := discPolForPeer(port) + inst := createDiscoveryInstanceWithNoGossipWithDisclosurePolicy(8610+group*5+i, id, bootPeers, pol) + inst.initiateSync(getAliveExpirationTimeout()/3, 10) + if group == 0 { + instances1 = append(instances1, inst) + } else { + instances2 = append(instances2, inst) + } + } + } + return instances1, instances2 +} + +func discPolForPeer(selfPort int) DisclosurePolicy { + return func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) { + targetPortStr := strings.Split(remotePeer.Endpoint, ":")[1] + targetPort, _ := strconv.ParseInt(targetPortStr, 10, 64) + return func(msg *proto.SignedGossipMessage) bool { + portOfAliveMsgStr := strings.Split(msg.GetAliveMsg().Membership.Endpoint, ":")[1] + portOfAliveMsg, _ := strconv.ParseInt(portOfAliveMsgStr, 10, 64) + + if portOfAliveMsg < 8615 && targetPort < 8615 { + return true + } + if portOfAliveMsg >= 8615 && targetPort >= 8615 { + return true + } + + // Else, expose peers with even ids to other peers with even ids + return portOfAliveMsg%2 == 0 && targetPort%2 == 0 + }, func(msg *proto.SignedGossipMessage) *proto.Envelope { + if selfPort < 8615 && targetPort >= 8615 { + msg.Envelope.SecretEnvelope = nil + } + + if selfPort >= 8615 && targetPort < 8615 { + msg.Envelope.SecretEnvelope = nil + } + + return msg.Envelope + } + } +} + func TestConfigFromFile(t *testing.T) { preAliveTimeInterval := getAliveTimeInterval() preAliveExpirationTimeout := getAliveExpirationTimeout() @@ -628,3 +797,17 @@ func assertMembership(t *testing.T, instances []*gossipInstance, expectedNum int } waitUntilOrFail(t, fullMembership) } + +func portsOfMembers(members []NetworkMember) []int { + ports := make([]int, len(members)) + for i := range members { + ports[i] = portOfEndpoint(members[i].Endpoint) + } + sort.Ints(ports) + return ports +} + +func portOfEndpoint(endpoint string) int { + port, _ := strconv.ParseInt(strings.Split(endpoint, ":")[1], 10, 64) + return int(port) +} diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 45d1911d1ef..7e325007285 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -19,14 +19,13 @@ package gossip import ( "bytes" "crypto/tls" + "errors" "fmt" + "reflect" "sync" "sync/atomic" "time" - "errors" - "reflect" - "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" @@ -121,7 +120,14 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis g.discAdapter = g.newDiscoveryAdapter() g.disSecAdap = g.newDiscoverySecurityAdapter() - g.disc = discovery.NewDiscoveryService(conf.BootstrapPeers, g.selfNetworkMember(), g.discAdapter, g.disSecAdap) + noopDisclosurePol := func(remotePeer *discovery.NetworkMember) (discovery.Sieve, discovery.EnvelopeFilter) { + return func(msg *proto.SignedGossipMessage) bool { + return true + }, func(message *proto.SignedGossipMessage) *proto.Envelope { + return message.Envelope + } + } + g.disc = discovery.NewDiscoveryService(conf.BootstrapPeers, g.selfNetworkMember(), g.discAdapter, g.disSecAdap, noopDisclosurePol) g.logger.Info("Creating gossip service with self membership of", g.selfNetworkMember()) g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs)