Skip to content

Commit

Permalink
[FAB-6012] Custom channel membership filtering
Browse files Browse the repository at this point in the history
This commit adds an ability to query gossip for channel members
and pass a custom filter that filters out peers according to
their identities and signatures.

Change-Id: Ibad6b2ef257864b2d980cc1338f4a41be4b33eb9
Signed-off-by: yacovm <[email protected]>
  • Loading branch information
yacovm committed Sep 2, 2017
1 parent 93f3c9b commit 34eb8fe
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 6 deletions.
8 changes: 8 additions & 0 deletions gossip/api/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,11 @@ type PeerSuspector func(identity PeerIdentityType) bool
// PeerSecureDialOpts returns the gRPC DialOptions to use for connection level
// security when communicating with remote peer endpoints
type PeerSecureDialOpts func() []grpc.DialOption

// PeerSignature defines a signature of a peer
// on a given message
type PeerSignature struct {
Signature []byte
Message []byte
PeerIdentity PeerIdentityType
}
7 changes: 4 additions & 3 deletions gossip/api/subchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import "github.com/hyperledger/fabric/gossip/common"
// or which peers are eligible of receiving a certain message
type RoutingFilter func(peerIdentity PeerIdentityType) bool

// CollectionCriteria describes a certain sub-channel, or a part of it
type CollectionCriteria []byte
// CollectionCriteria describes a way of selecting peers from a sub-channel
// given their signatures
type SubChannelSelectionCriteria func(signature PeerSignature) bool

// RoutingFilterFactory defines an object that given a CollectionCriteria and a channel,
// it can ascertain which peers should be aware of the data related to the
// CollectionCriteria.
type RoutingFilterFactory interface {
// Peers returns a RoutingFilter for given chainID and CollectionCriteria
Peers(common.ChainID, CollectionCriteria) RoutingFilter
Peers(common.ChainID, SubChannelSelectionCriteria) RoutingFilter
}
10 changes: 10 additions & 0 deletions gossip/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,13 @@ func SelectPeers(k int, peerPool []discovery.NetworkMember, filter RoutingFilter

return remotePeers
}

// First returns the first peer that matches the given filter
func First(peerPool []discovery.NetworkMember, filter RoutingFilter) *comm.RemotePeer {
for _, p := range peerPool {
if filter(p) {
return &comm.RemotePeer{PKIID: p.PKIid, Endpoint: p.PreferredEndpoint()}
}
}
return nil
}
22 changes: 22 additions & 0 deletions gossip/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package filter
import (
"testing"

"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -36,6 +37,27 @@ func TestCombineRoutingFilters(t *testing.T) {
assert.False(t, CombineRoutingFilters(a, b)(discovery.NetworkMember{InternalEndpoint: "b"}))
}

func TestFirst(t *testing.T) {
peerA := discovery.NetworkMember{Endpoint: "a"}
peerB := discovery.NetworkMember{Endpoint: "b"}
peers := []discovery.NetworkMember{peerA, peerB}
assert.Equal(t, &comm.RemotePeer{Endpoint: "a"}, First(peers, func(discovery.NetworkMember) bool {
return true
}))

assert.Equal(t, &comm.RemotePeer{Endpoint: "b"}, First(peers, func(nm discovery.NetworkMember) bool {
return nm.PreferredEndpoint() == "b"
}))

peerAA := discovery.NetworkMember{Endpoint: "aa"}
peerAB := discovery.NetworkMember{Endpoint: "ab"}
peers = append(peers, peerAA)
peers = append(peers, peerAB)
assert.Equal(t, &comm.RemotePeer{Endpoint: "aa"}, First(peers, func(nm discovery.NetworkMember) bool {
return len(nm.PreferredEndpoint()) > 1
}))
}

func TestSelectPeers(t *testing.T) {
a := func(nm discovery.NetworkMember) bool {
return nm.Endpoint == "a"
Expand Down
26 changes: 26 additions & 0 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type GossipChannel interface {
// GetPeers returns a list of peers with metadata as published by them
GetPeers() []discovery.NetworkMember

// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
// only peer identities that match the given criteria
PeerFilter(api.SubChannelSelectionCriteria) filter.RoutingFilter

// IsMemberInChan checks whether the given member is eligible to be in the channel
IsMemberInChan(member discovery.NetworkMember) bool

Expand Down Expand Up @@ -372,6 +376,27 @@ func (gc *gossipChannel) IsMemberInChan(member discovery.NetworkMember) bool {
return gc.IsOrgInChannel(org)
}

// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
// only peer identities that match the given criteria
func (gc *gossipChannel) PeerFilter(messagePredicate api.SubChannelSelectionCriteria) filter.RoutingFilter {
return func(member discovery.NetworkMember) bool {
peerIdentity := gc.GetIdentityByPKIID(member.PKIid)
if len(peerIdentity) == 0 {
return false
}
msg := gc.stateInfoMsgStore.MembershipStore.MsgByID(member.PKIid)
if msg == nil {
return false
}

return messagePredicate(api.PeerSignature{
Message: msg.Payload,
Signature: msg.Signature,
PeerIdentity: peerIdentity,
})
}
}

// IsOrgInChannel returns whether the given organization is in the channel
func (gc *gossipChannel) IsOrgInChannel(membersOrg api.OrgIdentityType) bool {
gc.RLock()
Expand Down Expand Up @@ -722,6 +747,7 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
if !msg.IsStateInfoMsg() {
return
}

gc.Lock()
defer gc.Unlock()

Expand Down
29 changes: 26 additions & 3 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package channel

import (
"bytes"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -1427,6 +1428,7 @@ func TestGossipChannelEligibility(t *testing.T) {
{PKIid: pkiIDInOrg1},
{PKIid: pkiIDInOrg1ButNotEligible},
{PKIid: pkiIDinOrg2},
{PKIid: pkiIDinOrg3},
}
adapter.On("GetMembership").Return(members)
adapter.On("Gossip", mock.Anything)
Expand All @@ -1453,15 +1455,36 @@ func TestGossipChannelEligibility(t *testing.T) {
})
// Every peer sends a StateInfo message
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1ButNotEligible, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg3, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDinOrg2, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1ButNotEligible, msg: createStateInfoMsg(1, pkiIDInOrg1ButNotEligible, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDinOrg3, msg: createStateInfoMsg(1, pkiIDinOrg3, channelA)})

assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1ButNotEligible}))
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))

// Ensure peers from the channel are returned
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return true
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return true
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
// But not peers which aren't in the channel
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return true
})(discovery.NetworkMember{PKIid: pkiIDinOrg3}))

// Ensure the given predicate is considered
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))

assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))

// Remove org2 from the channel
gc.ConfigureChannel(&joinChanMsg{
members2AnchorPeers: map[string][]api.AnchorPeer{
Expand Down
5 changes: 5 additions & 0 deletions gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/filter"
proto "github.com/hyperledger/fabric/protos/gossip"
)

Expand Down Expand Up @@ -42,6 +43,10 @@ type Gossip interface {
// Gossip sends a message to other peers to the network
Gossip(msg *proto.GossipMessage)

// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
// only peer identities that match the given criteria, and that they published their channel participation
PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error)

// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
// If passThrough is false, the messages are processed by the gossip layer beforehand.
// If passThrough is true, the gossip layer doesn't intervene and the messages
Expand Down
10 changes: 10 additions & 0 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,16 @@ func (g *gossipServiceImpl) PeersOfChannel(channel common.ChainID) []discovery.N
return gc.GetPeers()
}

// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
// only peer identities that match the given criteria, and that they published their channel participation
func (g *gossipServiceImpl) PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error) {
gc := g.chanState.getGossipChannelByChainID(channel)
if gc == nil {
return nil, errors.Errorf("Channel %s doesn't exist", string(channel))
}
return gc.PeerFilter(messagePredicate), nil
}

// Stop stops the gossip component
func (g *gossipServiceImpl) Stop() {
if g.toDie() {
Expand Down
5 changes: 5 additions & 0 deletions gossip/service/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/filter"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/peer"
Expand All @@ -38,6 +39,10 @@ type gossipMock struct {
mock.Mock
}

func (*gossipMock) PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error) {
panic("implement me")
}

func (*gossipMock) SuspectPeers(s api.PeerSuspector) {
panic("implement me")
}
Expand Down
5 changes: 5 additions & 0 deletions gossip/state/mocks/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/filter"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/stretchr/testify/mock"
)
Expand All @@ -19,6 +20,10 @@ type GossipMock struct {
mock.Mock
}

func (*GossipMock) PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error) {
panic("implement me")
}

func (g *GossipMock) SuspectPeers(s api.PeerSuspector) {
g.Called(s)

Expand Down

0 comments on commit 34eb8fe

Please sign in to comment.