Skip to content

Commit

Permalink
[FABG-34] discovery:skip new peer till config refresh
Browse files Browse the repository at this point in the history
- peer from new org will be excluded from discovered peers
list if its tlscert is not being updated in certpool by memebership.
- new function ContainsMSP(msp string) is added to ChannelMembership


Change-Id: I13c2cd2842dde7987e45be0792fe3a41ccd0c757
Signed-off-by: Sudesh Shetty <[email protected]>
  • Loading branch information
sudeshrshetty committed Aug 7, 2018
1 parent 72a486f commit 0424521
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 27 deletions.
21 changes: 14 additions & 7 deletions pkg/client/common/discovery/dynamicdiscovery/chservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,23 @@ import (
// are currently joined to the given channel.
type ChannelService struct {
*service
channelID string
channelID string
membership fab.ChannelMembership
}

// NewChannelService creates a Discovery Service to query the list of member peers on a given channel.
func NewChannelService(ctx contextAPI.Client, channelID string, opts ...coptions.Opt) (*ChannelService, error) {
func NewChannelService(ctx contextAPI.Client, membership fab.ChannelMembership, channelID string, opts ...coptions.Opt) (*ChannelService, error) {
logger.Debug("Creating new dynamic discovery service")
s := &ChannelService{
channelID: channelID,
channelID: channelID,
membership: membership,
}
s.service = newService(ctx.EndpointConfig(), s.queryPeers, opts...)
err := s.service.initialize(ctx)
if err != nil {
return nil, err
}

return s, nil
}

Expand Down Expand Up @@ -118,10 +121,14 @@ func (s *ChannelService) asPeers(ctx contextAPI.Client, endpoints []*discclient.
if !ok {
continue
}
peers = append(peers, &peerEndpoint{
Peer: peer,
blockHeight: endpoint.StateInfoMessage.GetStateInfo().GetProperties().LedgerHeight,
})

//check if cache is updated with tlscert if this is a new org joined and membership is not done yet updating cache
if s.membership.ContainsMSP(peer.MSPID()) {
peers = append(peers, &peerEndpoint{
Peer: peer,
blockHeight: endpoint.StateInfoMessage.GetStateInfo().GetProperties().LedgerHeight,
})
}
}
return peers
}
Expand Down
103 changes: 102 additions & 1 deletion pkg/client/common/discovery/dynamicdiscovery/chservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestDiscoveryService(t *testing.T) {
}

service, err := NewChannelService(
ctx, ch,
ctx, mocks.NewMockMembership(), ch,
WithRefreshInterval(500*time.Millisecond),
WithResponseTimeout(2*time.Second),
)
Expand Down Expand Up @@ -122,6 +122,107 @@ func TestDiscoveryService(t *testing.T) {
require.Equalf(t, 1, len(peers), "expecting discovery filter to return only one peer")
}

func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) {

ctx := mocks.NewMockContext(mspmocks.NewMockSigningIdentity("test", mspID1))

config := &config{
EndpointConfig: mocks.NewMockEndpointConfig(),
peers: []pfab.ChannelPeer{
{
NetworkPeer: pfab.NetworkPeer{
PeerConfig: pfab.PeerConfig{
URL: peer1MSP1,
},
MSPID: mspID1,
},
},
{
NetworkPeer: pfab.NetworkPeer{
PeerConfig: pfab.PeerConfig{
URL: peer1MSP2,
},
MSPID: mspID2,
},
},
},
}
ctx.SetEndpointConfig(config)

discClient := clientmocks.NewMockDiscoveryClient()
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{},
},
)

clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
return discClient, nil
}

service, err := NewChannelService(
ctx,
mocks.NewMockMembershipWithMSPFilter([]string{mspID2}),
ch,
WithRefreshInterval(500*time.Millisecond),
WithResponseTimeout(2*time.Second),
)
require.NoError(t, err)
defer service.Close()

peers, err := service.GetPeers()
assert.NoError(t, err)
assert.Equal(t, 0, len(peers))

discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{
{
MSPID: mspID1,
Endpoint: peer1MSP1,
LedgerHeight: 5,
},
},
},
)

time.Sleep(1 * time.Second)

peers, err = service.GetPeers()
assert.NoError(t, err)
assert.Equalf(t, 1, len(peers), "Expected 1 peer")

discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{
{
MSPID: mspID1,
Endpoint: peer1MSP1,
LedgerHeight: 5,
},
{
MSPID: mspID2,
Endpoint: peer1MSP2,
LedgerHeight: 15,
},
},
},
)

time.Sleep(1 * time.Second)

//one of the peer for MSPID2 should be filtered out since it is not yet being updated by memebership cache (ContainsMSP returns false)
peers, err = service.GetPeers()
assert.NoError(t, err)
assert.Equalf(t, 1, len(peers), "Expected 1 peer among 2 been discovered, since one of them belong to new org with pending membership update")

filteredService := discovery.NewDiscoveryFilterService(service, &blockHeightFilter{minBlockHeight: 10})
peers, err = filteredService.GetPeers()
require.NoError(t, err)
require.Equalf(t, 0, len(peers), "expecting discovery filter to return only one peer")

}

func TestPickRandomNPeerConfigs(t *testing.T) {
counter := 20
allChPeers := createNChannelPeers(counter)
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/providers/fab/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type ChannelMembership interface {
Validate(serializedID []byte) error
// Verify the given signature
Verify(serializedID []byte, msg []byte, sig []byte) error
//Check is given MSP is available
ContainsMSP(msp string) bool
}

// Versions ...
Expand Down
44 changes: 34 additions & 10 deletions pkg/fab/channel/membership/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"crypto/x509"
"encoding/pem"

"strings"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/verifier"
Expand All @@ -24,6 +26,7 @@ var logger = logging.NewLogger("fabsdk/fab")

type identityImpl struct {
mspManager msp.MSPManager
msps []string
}

// Context holds the providers
Expand All @@ -34,11 +37,11 @@ type Context struct {

// New member identity
func New(ctx Context, cfg fab.ChannelCfg) (fab.ChannelMembership, error) {
m, err := createMSPManager(ctx, cfg)
mspManager, mspNames, err := createMSPManager(ctx, cfg)
if err != nil {
return nil, err
}
return &identityImpl{mspManager: m}, nil
return &identityImpl{mspManager: mspManager, msps: mspNames}, nil
}

func (i *identityImpl) Validate(serializedID []byte) error {
Expand All @@ -65,6 +68,15 @@ func (i *identityImpl) Verify(serializedID []byte, msg []byte, sig []byte) error
return id.Verify(msg, sig)
}

func (i *identityImpl) ContainsMSP(msp string) bool {
for _, v := range i.msps {
if v == strings.ToLower(msp) {
return true
}
}
return false
}

func areCertDatesValid(serializedID []byte) error {

sID := &mb.SerializedIdentity{}
Expand All @@ -89,28 +101,35 @@ func areCertDatesValid(serializedID []byte) error {
return nil
}

func createMSPManager(ctx Context, cfg fab.ChannelCfg) (msp.MSPManager, error) {
func createMSPManager(ctx Context, cfg fab.ChannelCfg) (msp.MSPManager, []string, error) {
mspManager := msp.NewMSPManager()
var mspNames []string
if len(cfg.MSPs()) > 0 {
msps, err := loadMSPs(cfg.MSPs(), ctx.CryptoSuite())
if err != nil {
return nil, errors.WithMessage(err, "load MSPs from config failed")
return nil, nil, errors.WithMessage(err, "load MSPs from config failed")
}

if err := mspManager.Setup(msps); err != nil {
return nil, errors.WithMessage(err, "MSPManager Setup failed")
return nil, nil, errors.WithMessage(err, "MSPManager Setup failed")
}
var certs [][]byte

certsByMsp := make(map[string][][]byte)
for _, msp := range msps {
certs = append(certs, msp.GetTLSRootCerts()...)
certs = append(certs, msp.GetTLSIntermediateCerts()...)
mspName, err := msp.GetIdentifier()
if err != nil {
return nil, nil, errors.WithMessage(err, "MSPManager certpool setup failed")
}
certsByMsp[mspName] = append(msp.GetTLSRootCerts(), msp.GetTLSIntermediateCerts()...)
}
if len(certs) > 0 {

for mspName, certs := range certsByMsp {
addCertsToConfig(ctx.EndpointConfig, certs)
mspNames = append(mspNames, strings.ToLower(mspName))
}
}

return mspManager, nil
return mspManager, mspNames, nil
}

func loadMSPs(mspConfigs []*mb.MSPConfig, cs core.CryptoSuite) ([]msp.MSP, error) {
Expand Down Expand Up @@ -185,6 +204,11 @@ func getFabricConfig(config *mb.MSPConfig) (*mb.FabricMSPConfig, error) {

//addCertsToConfig adds cert bytes to config TLSCACertPool
func addCertsToConfig(config fab.EndpointConfig, pemCertsList [][]byte) {

if len(pemCertsList) == 0 {
return
}

var certs []*x509.Certificate
for _, pemCerts := range pemCertsList {
for len(pemCerts) > 0 {
Expand Down
24 changes: 19 additions & 5 deletions pkg/fab/channel/membership/membership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/pem"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm/tls"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
mb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/msp"
"github.com/stretchr/testify/assert"
Expand All @@ -37,7 +38,11 @@ func TestCertSignedWithUnknownAuthority(t *testing.T) {
cfg := mocks.NewMockChannelCfg("")
// Test good config input
cfg.MockMSPs = []*mb.MSPConfig{buildMSPConfig(goodMSPID, []byte(validRootCA))}
m, err := New(Context{Providers: ctx}, cfg)
fabCertPool, err := tls.NewCertPool(false)
assert.Nil(t, err)
endpointConfig := &mocks.MockConfig{CustomTLSCACertPool: fabCertPool}

m, err := New(Context{Providers: ctx, EndpointConfig: endpointConfig}, cfg)
assert.Nil(t, err)
assert.NotNil(t, m)

Expand All @@ -51,6 +56,7 @@ func TestCertSignedWithUnknownAuthority(t *testing.T) {
if !strings.Contains(err.Error(), "certificate signed by unknown authority") {
t.Fatal("Expected error:'supplied identity is not valid: x509: certificate signed by unknown authority'")
}

}

//TestRevokedCertificate
Expand All @@ -64,7 +70,7 @@ func TestRevokedCertificate(t *testing.T) {
}
// Test good config input
cfg.MockMSPs = []*mb.MSPConfig{buildMSPConfig(goodMSPID, []byte(orgTwoCA))}
m, err := New(Context{Providers: ctx}, cfg)
m, err := New(Context{Providers: ctx, EndpointConfig: mocks.NewMockEndpointConfig()}, cfg)
assert.Nil(t, err)
assert.NotNil(t, m)

Expand All @@ -91,9 +97,13 @@ func TestCertificateDates(t *testing.T) {
if err != nil {
t.Fatalf("Error %s", err)
}
fabCertPool, err := tls.NewCertPool(false)
assert.Nil(t, err)
endpointConfig := &mocks.MockConfig{CustomTLSCACertPool: fabCertPool}

// Test good config input
cfg.MockMSPs = []*mb.MSPConfig{buildMSPConfig(goodMSPID, []byte(orgTwoCA))}
m, err := New(Context{Providers: ctx}, cfg)
m, err := New(Context{Providers: ctx, EndpointConfig: endpointConfig}, cfg)
assert.Nil(t, err)
assert.NotNil(t, m)

Expand Down Expand Up @@ -125,15 +135,19 @@ func TestNewMembership(t *testing.T) {
ctx := mocks.NewMockProviderContext()
cfg := mocks.NewMockChannelCfg("")

fabCertPool, err := tls.NewCertPool(false)
assert.Nil(t, err)
endpointConfig := &mocks.MockConfig{CustomTLSCACertPool: fabCertPool}

// Test bad config input
cfg.MockMSPs = []*mb.MSPConfig{buildMSPConfig(goodMSPID, []byte("invalid"))}
m, err := New(Context{Providers: ctx}, cfg)
m, err := New(Context{Providers: ctx, EndpointConfig: endpointConfig}, cfg)
assert.NotNil(t, err)
assert.Nil(t, m)

// Test good config input
cfg.MockMSPs = []*mb.MSPConfig{buildMSPConfig(goodMSPID, []byte(validRootCA))}
m, err = New(Context{Providers: ctx}, cfg)
m, err = New(Context{Providers: ctx, EndpointConfig: endpointConfig}, cfg)
assert.Nil(t, err)
assert.NotNil(t, m)

Expand Down
10 changes: 10 additions & 0 deletions pkg/fab/channel/membership/reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ func (ref *Ref) Verify(serializedID []byte, msg []byte, sig []byte) error {
return membership.Verify(serializedID, msg, sig)
}

// ContainsMSP checks if given MSP is available in the underlying reference
func (ref *Ref) ContainsMSP(msp string) bool {
membership, err := ref.get()
if err != nil {
logger.Debugf("Failed to check ContainsMSP, err: %s", err)
return false
}
return membership.ContainsMSP(msp)
}

func (ref *Ref) get() (fab.ChannelMembership, error) {
m, err := ref.Get()
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/fab/mocks/mockchprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type MockChannelService struct {
mockOrderers []string
discovery fab.DiscoveryService
selection fab.SelectionService
membership fab.ChannelMembership
}

// NewMockChannelProvider returns a mock ChannelProvider
Expand Down Expand Up @@ -91,9 +92,17 @@ func (cs *MockChannelService) Config() (fab.ChannelConfig, error) {

// Membership returns member identification
func (cs *MockChannelService) Membership() (fab.ChannelMembership, error) {
if cs.membership != nil {
return cs.membership, nil
}
return NewMockMembership(), nil
}

//SetCustomMembership sets custom channel membership for unit-test purposes
func (cs *MockChannelService) SetCustomMembership(customMembership fab.ChannelMembership) {
cs.membership = customMembership
}

//ChannelConfig returns channel config
func (cs *MockChannelService) ChannelConfig() (fab.ChannelCfg, error) {
return &MockChannelCfg{MockID: cs.channelID, MockOrderers: cs.mockOrderers}, nil
Expand Down
Loading

0 comments on commit 0424521

Please sign in to comment.