From 53a4bd0d408eb9137a815fdf098c4c6465e1e8fd Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Fri, 9 Mar 2018 13:40:24 -0500 Subject: [PATCH] [FAB-8610] Integrate with latest Client Context - Integrate with the latest context.Client which includes the InfraProvider and DiscoveryProvider - Remove RegisterConnectionEvent from the EventClient interface since this is an option when creating an events client Change-Id: Id96646eb43d2cbe2240687c257bb5964a0e8d89b Signed-off-by: Bob Stasyszyn --- pkg/context/api/fab/eventservice.go | 5 - pkg/fab/comm/connection.go | 17 +- pkg/fab/comm/connection_test.go | 12 +- pkg/fab/events/api/connection.go | 2 +- pkg/fab/events/client/client.go | 14 +- pkg/fab/events/client/client_test.go | 146 +++++++++++------- .../events/client/dispatcher/dispatcher.go | 28 ++-- .../client/dispatcher/dispatcher_test.go | 32 ++-- pkg/fab/events/client/dispatcher/events.go | 17 +- .../events/client/dispatcher/registrations.go | 4 +- pkg/fab/events/client/mocks/mockconnection.go | 4 +- pkg/fab/events/client/mocks/mockdiscovery.go | 22 ++- pkg/fab/events/client/opts.go | 10 +- .../deliverclient/connection/connection.go | 11 +- .../connection/connection_test.go | 31 +--- pkg/fab/events/deliverclient/deliverclient.go | 16 +- .../deliverclient/deliverclient_test.go | 59 ++++--- .../deliverclient/dispatcher/dispatcher.go | 6 +- .../dispatcher/dispatcher_test.go | 35 +++-- .../eventhubclient/connection/connection.go | 9 +- .../connection/connection_test.go | 11 +- .../eventhubclient/discoveryprovider.go | 82 ++++++++++ .../eventhubclient/dispatcher/dispatcher.go | 6 +- .../dispatcher/dispatcher_test.go | 35 +++-- .../events/eventhubclient/eventhubclient.go | 33 +++- .../eventhubclient/eventhubclient_test.go | 87 +++++++---- 26 files changed, 463 insertions(+), 271 deletions(-) create mode 100644 pkg/fab/events/eventhubclient/discoveryprovider.go diff --git a/pkg/context/api/fab/eventservice.go b/pkg/context/api/fab/eventservice.go index a004087d0f..06d63996d1 100755 --- a/pkg/context/api/fab/eventservice.go +++ b/pkg/context/api/fab/eventservice.go @@ -100,9 +100,4 @@ type EventClient interface { // Close closes the connection to the event server and releases all resources. // Once this function is invoked the client may no longer be used. Close() - - // RegisterConnectionEvent registers a connection event. The returned - // ConnectionEvent channel is called whenever the client clients to - // or disconnects from the event server - RegisterConnectionEvent() (Registration, chan ConnectionEvent, error) } diff --git a/pkg/fab/comm/connection.go b/pkg/fab/comm/connection.go index 01f8f671d4..0bbd400713 100755 --- a/pkg/fab/comm/connection.go +++ b/pkg/fab/comm/connection.go @@ -15,6 +15,7 @@ import ( fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" "github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm" "github.com/hyperledger/fabric-sdk-go/pkg/core/config/urlutil" "github.com/hyperledger/fabric-sdk-go/pkg/logging" @@ -29,16 +30,16 @@ type StreamProvider func(conn *grpc.ClientConn) (grpc.ClientStream, error) // GRPCConnection manages the GRPC connection and client stream type GRPCConnection struct { - channelID string + context fabcontext.Client + chConfig fab.ChannelCfg conn *grpc.ClientConn stream grpc.ClientStream - context fabcontext.Client tlsCertHash []byte done int32 } // NewConnection creates a new connection -func NewConnection(ctx fabcontext.Client, channelID string, streamProvider StreamProvider, url string, opts ...options.Opt) (*GRPCConnection, error) { +func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamProvider, url string, opts ...options.Opt) (*GRPCConnection, error) { if url == "" { return nil, errors.New("server URL not specified") } @@ -73,17 +74,17 @@ func NewConnection(ctx fabcontext.Client, channelID string, streamProvider Strea } return &GRPCConnection{ - channelID: channelID, + context: ctx, + chConfig: chConfig, conn: grpcconn, stream: stream, - context: ctx, tlsCertHash: comm.TLSCertHash(ctx.Config()), }, nil } -// ChannelID returns the ID of the channel -func (c *GRPCConnection) ChannelID() string { - return c.channelID +// ChannelConfig returns the channel configuration +func (c *GRPCConnection) ChannelConfig() fab.ChannelCfg { + return c.chConfig } // Close closes the connection diff --git a/pkg/fab/comm/connection_test.go b/pkg/fab/comm/connection_test.go index 584dfc0700..7f6b63a71d 100755 --- a/pkg/fab/comm/connection_test.go +++ b/pkg/fab/comm/connection_test.go @@ -35,12 +35,13 @@ func TestConnection(t *testing.T) { channelID := "testchannel" context := newMockContext() + chConfig := fabmocks.NewMockChannelCfg(channelID) - conn, err := NewConnection(context, channelID, testStream, "") + conn, err := NewConnection(context, chConfig, testStream, "") if err == nil { t.Fatalf("expected error creating new connection with empty URL") } - conn, err = NewConnection(context, channelID, testStream, "invalidhost:0000", + conn, err = NewConnection(context, chConfig, testStream, "invalidhost:0000", WithFailFast(true), WithCertificate(nil), WithHostOverride(""), @@ -50,21 +51,18 @@ func TestConnection(t *testing.T) { if err == nil { t.Fatalf("expected error creating new connection with invalid URL") } - conn, err = NewConnection(context, channelID, invalidStream, peerURL) + conn, err = NewConnection(context, chConfig, invalidStream, peerURL) if err == nil { t.Fatalf("expected error creating new connection with invalid stream but got none") } - conn, err = NewConnection(context, channelID, testStream, peerURL) + conn, err = NewConnection(context, chConfig, testStream, peerURL) if err != nil { t.Fatalf("error creating new connection: %s", err) } if conn.Closed() { t.Fatalf("expected connection to be open") } - if conn.ChannelID() != channelID { - t.Fatalf("expected channel ID [%s] but got [%s]", channelID, conn.ChannelID()) - } if conn.Stream() == nil { t.Fatalf("got invalid stream") } diff --git a/pkg/fab/events/api/connection.go b/pkg/fab/events/api/connection.go index 9261db7616..d763de162a 100644 --- a/pkg/fab/events/api/connection.go +++ b/pkg/fab/events/api/connection.go @@ -22,4 +22,4 @@ type Connection interface { } // ConnectionProvider creates a Connection. -type ConnectionProvider func(channelID string, context context.Client, peer fab.Peer) (Connection, error) +type ConnectionProvider func(context context.Client, chConfig fab.ChannelCfg, peer fab.Peer) (Connection, error) diff --git a/pkg/fab/events/client/client.go b/pkg/fab/events/client/client.go index 6a4aa3e81e..65763658d0 100755 --- a/pkg/fab/events/client/client.go +++ b/pkg/fab/events/client/client.go @@ -40,7 +40,7 @@ type Client struct { eventservice.Service params sync.RWMutex - connEvent chan *fab.ConnectionEvent + connEvent chan *dispatcher.ConnectionEvent connectionState int32 stopped int32 registerOnce sync.Once @@ -163,7 +163,7 @@ func (c *Client) connect() error { c.registerOnce.Do(func() { logger.Debugf("Submitting connection event registration...") - _, eventch, err := c.RegisterConnectionEvent() + _, eventch, err := c.registerConnectionEvent() if err != nil { logger.Errorf("Error registering for connection events: %s", err) c.Close() @@ -239,15 +239,15 @@ func (c *Client) RegisterBlockEvent(filter ...fab.BlockFilter) (fab.Registration return c.Service.RegisterBlockEvent(filter...) } -// RegisterConnectionEvent registers a connection event. The returned +// registerConnectionEvent registers a connection event. The returned // ConnectionEvent channel will be called whenever the client clients or disconnects // from the event server -func (c *Client) RegisterConnectionEvent() (fab.Registration, chan *fab.ConnectionEvent, error) { +func (c *Client) registerConnectionEvent() (fab.Registration, chan *dispatcher.ConnectionEvent, error) { if c.Stopped() { return nil, nil, errors.New("event client is closed") } - eventch := make(chan *fab.ConnectionEvent, c.eventConsumerBufferSize) + eventch := make(chan *dispatcher.ConnectionEvent, c.eventConsumerBufferSize) errch := make(chan error) regch := make(chan fab.Registration) c.Submit(dispatcher.NewRegisterConnectionEvent(eventch, regch, errch)) @@ -348,13 +348,13 @@ func (c *Client) closeConnectEventChan() { } } -func (c *Client) connectEventChan() chan *fab.ConnectionEvent { +func (c *Client) connectEventChan() chan *dispatcher.ConnectionEvent { c.RLock() defer c.RUnlock() return c.connEventCh } -func (c *Client) notifyConnectEventChan(event *fab.ConnectionEvent) { +func (c *Client) notifyConnectEventChan(event *dispatcher.ConnectionEvent) { c.RLock() defer c.RUnlock() if c.connEventCh != nil { diff --git a/pkg/fab/events/client/client_test.go b/pkg/fab/events/client/client_test.go index d3832642ed..e67faac40e 100755 --- a/pkg/fab/events/client/client_test.go +++ b/pkg/fab/events/client/client_test.go @@ -46,8 +46,14 @@ func TestConnect(t *testing.T) { ), ) - discoveryService := clientmocks.CreateDiscoveryService(peer1, peer2) - eventClient, _, err := newClientWithMockConnAndOpts("mychannel", newMockContext(), connectionProvider, filteredClientProvider, discoveryService, []options.Opt{}) + eventClient, _, err := newClientWithMockConnAndOpts( + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg("mychannel"), + connectionProvider, filteredClientProvider, []options.Opt{}, + ) if err != nil { t.Fatalf("error creating channel event client: %s", err) } @@ -75,15 +81,17 @@ func TestConnect(t *testing.T) { func TestFailConnect(t *testing.T) { eventClient, _, err := newClientWithMockConnAndOpts( - "mychannel", newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg("mychannel"), mockconn.NewProviderFactory().Provider( mockconn.NewMockConnection( mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)), ), ), - failAfterConnectClientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), - []options.Opt{}, + failAfterConnectClientProvider, []options.Opt{}, ) if err != nil { t.Fatalf("error creating client: %s", err) @@ -95,9 +103,12 @@ func TestFailConnect(t *testing.T) { func TestCallsOnClosedClient(t *testing.T) { eventClient, _, err := newClientWithMockConn( - "mychannel", newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg("mychannel"), filteredClientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)), ) if err != nil { @@ -115,10 +126,6 @@ func TestCallsOnClosedClient(t *testing.T) { t.Fatalf("expecting error connecting to closed channel event client but got none") } - if _, _, err := eventClient.RegisterConnectionEvent(); err == nil { - t.Fatalf("expecting error registering for connection events on closed channel event client but got none") - } - if _, _, err := eventClient.RegisterFilteredBlockEvent(); err == nil { t.Fatalf("expecting error registering for block events on closed channel event client but got none") } @@ -138,9 +145,12 @@ func TestCallsOnClosedClient(t *testing.T) { func TestInvalidUnregister(t *testing.T) { channelID := "mychannel" eventClient, _, err := newClientWithMockConn( - channelID, newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), filteredClientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)), ) if err != nil { @@ -158,9 +168,12 @@ func TestInvalidUnregister(t *testing.T) { func TestUnauthorizedBlockEvents(t *testing.T) { channelID := "mychannel" eventClient, _, err := newClientWithMockConn( - channelID, newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), filteredClientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)), ) if err != nil { @@ -179,9 +192,12 @@ func TestUnauthorizedBlockEvents(t *testing.T) { func TestBlockEvents(t *testing.T) { channelID := "mychannel" eventClient, conn, err := newClientWithMockConn( - channelID, newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), clientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)), ) if err != nil { @@ -234,9 +250,12 @@ func TestBlockEvents(t *testing.T) { func TestFilteredBlockEvents(t *testing.T) { channelID := "mychannel" eventClient, conn, err := newClientWithMockConn( - channelID, newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), filteredClientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)), ) if err != nil { @@ -308,9 +327,12 @@ func TestFilteredBlockEvents(t *testing.T) { func TestBlockAndFilteredBlockEvents(t *testing.T) { channelID := "mychannel" eventClient, conn, err := newClientWithMockConn( - channelID, newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), clientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)), ) if err != nil { @@ -383,9 +405,12 @@ func TestBlockAndFilteredBlockEvents(t *testing.T) { func TestTxStatusEvents(t *testing.T) { channelID := "mychannel" eventClient, conn, err := newClientWithMockConn( - channelID, newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), filteredClientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)), ) if err != nil { @@ -464,9 +489,12 @@ func TestTxStatusEvents(t *testing.T) { func TestCCEvents(t *testing.T) { channelID := "mychannel" eventClient, conn, err := newClientWithMockConn( - channelID, newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), filteredClientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)), ) if err != nil { @@ -658,9 +686,12 @@ func TestConcurrentEvents(t *testing.T) { ccFilter := "event.*" eventClient, conn, err := newClientWithMockConnAndOpts( - channelID, newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg("mychannel"), nil, clientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), []options.Opt{ esdispatcher.WithEventConsumerBufferSize(uint(numEvents) * 4), }, @@ -869,14 +900,13 @@ func txStatusTest(eventClient *Client, ledger servicemocks.Ledger, channelID str ) select { - case txStatus, ok := <-txeventch: + case _, ok := <-txeventch: mutex.Lock() if !ok { errs = append(errs, errors.New("unexpected closed channel")) } else { receivedEvents++ } - fmt.Printf("received TxStatus %#v\n", txStatus) mutex.Unlock() case <-time.After(5 * time.Second): mutex.Lock() @@ -899,10 +929,13 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome mockconn cp := mockconn.NewProviderFactory() eventClient, _, err := newClientWithMockConnAndOpts( - "mychannel", newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg("mychannel"), cp.FlakeyProvider(connAttemptResult, mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory))), clientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), []options.Opt{ esdispatcher.WithEventConsumerTimeout(time.Second), WithMaxConnectAttempts(maxConnectAttempts), @@ -928,15 +961,18 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome mockconn func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expectedOutcome mockconn.Outcome, connAttemptResult mockconn.ConnectAttemptResults) { cp := mockconn.NewProviderFactory() - connectch := make(chan *fab.ConnectionEvent) + connectch := make(chan *dispatcher.ConnectionEvent) ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory) eventClient, _, err := newClientWithMockConnAndOpts( - "mychannel", newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg("mychannel"), cp.FlakeyProvider(connAttemptResult, mockconn.WithLedger(ledger)), clientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), []options.Opt{ esdispatcher.WithEventConsumerTimeout(3 * time.Second), WithMaxConnectAttempts(1), @@ -987,10 +1023,13 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents mockconn.NumBlo cp := mockconn.NewProviderFactory() eventClient, _, err := newClientWithMockConnAndOpts( - channelID, newMockContext(), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg("mychannel"), cp.FlakeyProvider(connectResults, mockconn.WithLedger(ledger)), clientProvider, - clientmocks.CreateDiscoveryService(peer1, peer2), []options.Opt{ esdispatcher.WithEventConsumerTimeout(3 * time.Second), WithMaxConnectAttempts(1), @@ -1080,7 +1119,7 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents mockconn.NumBlo } } -func listenConnection(eventch chan *fab.ConnectionEvent, outcome chan mockconn.Outcome) { +func listenConnection(eventch chan *dispatcher.ConnectionEvent, outcome chan mockconn.Outcome) { state := InitialState for { @@ -1136,10 +1175,10 @@ func listenEvents(blockch <-chan *fab.BlockEvent, ccch <-chan *fab.CCEvent, wait } } -type ClientProvider func(channelID string, context context.Client, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts []options.Opt) (*Client, error) +type ClientProvider func(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt) (*Client, error) -var clientProvider = func(channelID string, context context.Client, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts []options.Opt) (*Client, error) { - return newClient(channelID, context, connectionProvider, discoveryService, opts, true, +var clientProvider = func(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt) (*Client, error) { + return newClient(context, chConfig, connectionProvider, opts, true, func() error { fmt.Printf("AfterConnect called") return nil @@ -1150,16 +1189,16 @@ var clientProvider = func(channelID string, context context.Client, connectionPr }) } -var failAfterConnectClientProvider = func(channelID string, context context.Client, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts []options.Opt) (*Client, error) { - return newClient(channelID, context, connectionProvider, discoveryService, opts, true, +var failAfterConnectClientProvider = func(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt) (*Client, error) { + return newClient(context, chConfig, connectionProvider, opts, true, func() error { return errors.New("simulated failure after connect") }, nil) } -var filteredClientProvider = func(channelID string, context context.Client, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts []options.Opt) (*Client, error) { - return newClient(channelID, context, connectionProvider, discoveryService, opts, false, +var filteredClientProvider = func(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt) (*Client, error) { + return newClient(context, chConfig, connectionProvider, opts, false, func() error { fmt.Printf("AfterConnect called") return nil @@ -1170,13 +1209,12 @@ var filteredClientProvider = func(channelID string, context context.Client, conn }) } -func newClient(channelID string, context context.Client, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts []options.Opt, permitBlockEvents bool, afterConnect handler, beforeReconnect handler) (*Client, error) { +func newClient(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt, permitBlockEvents bool, afterConnect handler, beforeReconnect handler) (*Client, error) { client := New( permitBlockEvents, dispatcher.New( - context, channelID, + context, chConfig, connectionProvider, - discoveryService, opts..., ), opts..., @@ -1190,19 +1228,19 @@ func newClient(channelID string, context context.Client, connectionProvider api. return client, nil } -func newClientWithMockConn(channelID string, context context.Client, clientProvider ClientProvider, discoveryService fab.DiscoveryService, connOpts ...mockconn.Opt) (*Client, mockconn.Connection, error) { +func newClientWithMockConn(context context.Client, chConfig fab.ChannelCfg, clientProvider ClientProvider, connOpts ...mockconn.Opt) (*Client, mockconn.Connection, error) { conn := mockconn.NewMockConnection(connOpts...) - client, _, err := newClientWithMockConnAndOpts(channelID, context, mockconn.NewProviderFactory().Provider(conn), clientProvider, discoveryService, []options.Opt{}) + client, _, err := newClientWithMockConnAndOpts(context, chConfig, mockconn.NewProviderFactory().Provider(conn), clientProvider, []options.Opt{}) return client, conn, err } -func newClientWithMockConnAndOpts(channelID string, context context.Client, connectionProvider api.ConnectionProvider, clientProvider ClientProvider, discoveryService fab.DiscoveryService, opts []options.Opt, connOpts ...mockconn.Opt) (*Client, mockconn.Connection, error) { +func newClientWithMockConnAndOpts(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, clientProvider ClientProvider, opts []options.Opt, connOpts ...mockconn.Opt) (*Client, mockconn.Connection, error) { var conn mockconn.Connection if connectionProvider == nil { conn = mockconn.NewMockConnection(connOpts...) connectionProvider = mockconn.NewProviderFactory().Provider(conn) } - client, err := clientProvider(channelID, context, connectionProvider, discoveryService, opts) + client, err := clientProvider(context, chConfig, connectionProvider, opts) return client, conn, err } @@ -1258,7 +1296,3 @@ func checkCCEvent(t *testing.T, event *fab.CCEvent, expectedCCID string, expecte t.Fatalf("expecting one of [%v] but received [%s]", expectedEventNames, event.EventName) } } - -func newMockContext() context.Client { - return fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) -} diff --git a/pkg/fab/events/client/dispatcher/dispatcher.go b/pkg/fab/events/client/dispatcher/dispatcher.go index b43e136468..5a645a56b3 100755 --- a/pkg/fab/events/client/dispatcher/dispatcher.go +++ b/pkg/fab/events/client/dispatcher/dispatcher.go @@ -29,9 +29,8 @@ var logger = logging.NewLogger("fabsdk/fab") type Dispatcher struct { esdispatcher.Dispatcher params - channelID string context context.Client - discoveryService fab.DiscoveryService + chConfig fab.ChannelCfg signingMgr core.SigningManager connection api.Connection connectionRegistration *ConnectionReg @@ -41,7 +40,7 @@ type Dispatcher struct { type handler func(esdispatcher.Event) // New creates a new dispatcher -func New(context context.Client, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) *Dispatcher { +func New(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts ...options.Opt) *Dispatcher { params := defaultParams() options.Apply(params, opts) @@ -49,8 +48,7 @@ func New(context context.Client, channelID string, connectionProvider api.Connec Dispatcher: *esdispatcher.New(opts...), params: *params, context: context, - discoveryService: discoveryService, - channelID: channelID, + chConfig: chConfig, connectionProvider: connectionProvider, } } @@ -65,9 +63,9 @@ func (ed *Dispatcher) Start() error { return nil } -// ChannelID returns the channel ID -func (ed *Dispatcher) ChannelID() string { - return ed.channelID +// ChannelConfig returns the channel configuration +func (ed *Dispatcher) ChannelConfig() fab.ChannelCfg { + return ed.chConfig } // Connection returns the connection to the event server @@ -101,7 +99,13 @@ func (ed *Dispatcher) HandleConnectEvent(e esdispatcher.Event) { return } - peers, err := ed.discoveryService.GetPeers() + discoveryService, err := ed.context.DiscoveryProvider().CreateDiscoveryService(ed.chConfig.Name()) + if err != nil { + evt.ErrCh <- nil + return + } + + peers, err := discoveryService.GetPeers() if err != nil { evt.ErrCh <- err return @@ -118,7 +122,7 @@ func (ed *Dispatcher) HandleConnectEvent(e esdispatcher.Event) { return } - conn, err := ed.connectionProvider(ed.channelID, ed.context, peer) + conn, err := ed.connectionProvider(ed.context, ed.chConfig, peer) if err != nil { logger.Warnf("error creating connection: %s", err) evt.ErrCh <- errors.WithMessage(err, fmt.Sprintf("could not create client conn")) @@ -170,7 +174,7 @@ func (ed *Dispatcher) HandleConnectedEvent(e esdispatcher.Event) { if ed.connectionRegistration != nil && ed.connectionRegistration.Eventch != nil { select { - case ed.connectionRegistration.Eventch <- &fab.ConnectionEvent{Connected: true}: + case ed.connectionRegistration.Eventch <- NewConnectionEvent(true, nil): default: logger.Warnf("Unable to send to connection event channel.") } @@ -191,7 +195,7 @@ func (ed *Dispatcher) HandleDisconnectedEvent(e esdispatcher.Event) { if ed.connectionRegistration != nil { logger.Debugf("Disconnected from event server: %s", evt.Err) select { - case ed.connectionRegistration.Eventch <- &fab.ConnectionEvent{Connected: false, Err: evt.Err}: + case ed.connectionRegistration.Eventch <- NewConnectionEvent(false, evt.Err): default: logger.Warnf("Unable to send to connection event channel.") } diff --git a/pkg/fab/events/client/dispatcher/dispatcher_test.go b/pkg/fab/events/client/dispatcher/dispatcher_test.go index 198a48945f..0cc32f2468 100755 --- a/pkg/fab/events/client/dispatcher/dispatcher_test.go +++ b/pkg/fab/events/client/dispatcher/dispatcher_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - "github.com/hyperledger/fabric-sdk-go/pkg/common/context" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp" @@ -30,7 +29,11 @@ func TestConnect(t *testing.T) { channelID := "testchannel" dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( clientmocks.NewMockConnection( clientmocks.WithLedger( @@ -38,12 +41,11 @@ func TestConnect(t *testing.T) { ), ), ), - clientmocks.CreateDiscoveryService(peer1, peer2), WithLoadBalancePolicy(lbp.NewRandom()), ) - if dispatcher.ChannelID() != channelID { - t.Fatalf("Expecting channel ID [%s] but got [%s]", channelID, dispatcher.ChannelID()) + if dispatcher.ChannelConfig().Name() != channelID { + t.Fatalf("Expecting channel ID [%s] but got [%s]", channelID, dispatcher.ChannelConfig().Name()) } if err := dispatcher.Start(); err != nil { @@ -106,7 +108,11 @@ func TestConnectNoPeers(t *testing.T) { channelID := "testchannel" dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(), // Add no peers to discovery service + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( clientmocks.NewMockConnection( clientmocks.WithLedger( @@ -114,7 +120,6 @@ func TestConnectNoPeers(t *testing.T) { ), ), ), - clientmocks.CreateDiscoveryService(), // Add no peers to discovery service ) if err := dispatcher.Start(); err != nil { @@ -146,7 +151,11 @@ func TestConnectionEvent(t *testing.T) { channelID := "testchannel" dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( clientmocks.NewMockConnection( clientmocks.WithLedger( @@ -154,7 +163,6 @@ func TestConnectionEvent(t *testing.T) { ), ), ), - clientmocks.CreateDiscoveryService(peer1, peer2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) @@ -168,7 +176,7 @@ func TestConnectionEvent(t *testing.T) { expectedDisconnectErr := "simulated disconnect error" // Register connection event - connch := make(chan *fab.ConnectionEvent, 10) + connch := make(chan *ConnectionEvent, 10) errch := make(chan error) state := "" go func() { @@ -240,7 +248,3 @@ func TestConnectionEvent(t *testing.T) { t.Fatal(err.Error()) } } - -func newMockContext() context.Client { - return fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) -} diff --git a/pkg/fab/events/client/dispatcher/events.go b/pkg/fab/events/client/dispatcher/events.go index fd43a6034a..269e62ab44 100755 --- a/pkg/fab/events/client/dispatcher/events.go +++ b/pkg/fab/events/client/dispatcher/events.go @@ -18,7 +18,7 @@ type RegisterConnectionEvent struct { } // NewRegisterConnectionEvent creates a new RegisterConnectionEvent -func NewRegisterConnectionEvent(eventch chan<- *fab.ConnectionEvent, regch chan<- fab.Registration, errch chan<- error) *RegisterConnectionEvent { +func NewRegisterConnectionEvent(eventch chan<- *ConnectionEvent, regch chan<- fab.Registration, errch chan<- error) *RegisterConnectionEvent { return &RegisterConnectionEvent{ Reg: &ConnectionReg{Eventch: eventch}, RegisterEvent: esdispatcher.NewRegisterEvent(regch, errch), @@ -64,3 +64,18 @@ type DisconnectEvent struct { func NewDisconnectEvent(errch chan<- error) *DisconnectEvent { return &DisconnectEvent{Errch: errch} } + +// ConnectionEvent is sent when the client disconnects from or +// reconnects to the event server. Connected == true means that the +// client has connected, whereas Connected == false means that the +// client has disconnected. In the disconnected case, Err contains +// the disconnect error. +type ConnectionEvent struct { + Connected bool + Err error +} + +// NewConnectionEvent returns a new ConnectionEvent +func NewConnectionEvent(connected bool, err error) *ConnectionEvent { + return &ConnectionEvent{Connected: connected, Err: err} +} diff --git a/pkg/fab/events/client/dispatcher/registrations.go b/pkg/fab/events/client/dispatcher/registrations.go index 3bb388bb26..20c7097675 100755 --- a/pkg/fab/events/client/dispatcher/registrations.go +++ b/pkg/fab/events/client/dispatcher/registrations.go @@ -6,9 +6,7 @@ SPDX-License-Identifier: Apache-2.0 package dispatcher -import "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" - // ConnectionReg is a connection registration type ConnectionReg struct { - Eventch chan<- *fab.ConnectionEvent + Eventch chan<- *ConnectionEvent } diff --git a/pkg/fab/events/client/mocks/mockconnection.go b/pkg/fab/events/client/mocks/mockconnection.go index de47ebd60d..39f56b159d 100755 --- a/pkg/fab/events/client/mocks/mockconnection.go +++ b/pkg/fab/events/client/mocks/mockconnection.go @@ -198,7 +198,7 @@ func (cp *ProviderFactory) Connection() Connection { // Provider returns a connection provider that always returns the given connection func (cp *ProviderFactory) Provider(conn Connection) api.ConnectionProvider { - return func(string, context.Client, fab.Peer) (api.Connection, error) { + return func(context.Client, fab.ChannelCfg, fab.Peer) (api.Connection, error) { return conn, nil } } @@ -208,7 +208,7 @@ func (cp *ProviderFactory) Provider(conn Connection) api.ConnectionProvider { // to return a connection, what authorization to give the connection, etc. func (cp *ProviderFactory) FlakeyProvider(connAttemptResults ConnectAttemptResults, opts ...Opt) api.ConnectionProvider { var connectAttempt Attempt - return func(string, context.Client, fab.Peer) (api.Connection, error) { + return func(context.Client, fab.ChannelCfg, fab.Peer) (api.Connection, error) { connectAttempt++ _, ok := connAttemptResults[connectAttempt] diff --git a/pkg/fab/events/client/mocks/mockdiscovery.go b/pkg/fab/events/client/mocks/mockdiscovery.go index b32bf046ca..21be1715c3 100644 --- a/pkg/fab/events/client/mocks/mockdiscovery.go +++ b/pkg/fab/events/client/mocks/mockdiscovery.go @@ -10,18 +10,30 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" ) -// MockDiscoveryService is a mock discovery service used for event endpoint discovery -type MockDiscoveryService struct { +// MockDiscoveryProvider mocks out the discovery provider +type MockDiscoveryProvider struct { peers []fab.Peer } -// CreateDiscoveryService returns a new mock discovery service -func CreateDiscoveryService(peers ...fab.Peer) fab.DiscoveryService { - return &MockDiscoveryService{ +// NewDiscoveryProvider returns a new MockDiscoveryProvider +func NewDiscoveryProvider(peers ...fab.Peer) fab.DiscoveryProvider { + return &MockDiscoveryProvider{ peers: peers, } } +// CreateDiscoveryService returns a new MockDiscoveryService +func (p *MockDiscoveryProvider) CreateDiscoveryService(channelID string) (fab.DiscoveryService, error) { + return &MockDiscoveryService{ + peers: p.peers, + }, nil +} + +// MockDiscoveryService is a mock discovery service used for event endpoint discovery +type MockDiscoveryService struct { + peers []fab.Peer +} + // GetPeers returns a list of discovered peers func (s *MockDiscoveryService) GetPeers() ([]fab.Peer, error) { return s.peers, nil diff --git a/pkg/fab/events/client/opts.go b/pkg/fab/events/client/opts.go index 9dc12a78d5..61c5e3c240 100755 --- a/pkg/fab/events/client/opts.go +++ b/pkg/fab/events/client/opts.go @@ -10,7 +10,7 @@ import ( "time" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" ) type params struct { @@ -20,7 +20,7 @@ type params struct { maxReconnAttempts uint reconnInitialDelay time.Duration timeBetweenConnAttempts time.Duration - connEventCh chan *fab.ConnectionEvent + connEventCh chan *dispatcher.ConnectionEvent respTimeout time.Duration } @@ -78,7 +78,7 @@ func WithReconnectInitialDelay(value time.Duration) options.Opt { // WithConnectionEvent sets the channel that is to receive connection events, i.e. when the client connects and/or // disconnects from the channel event service. -func WithConnectionEvent(value chan *fab.ConnectionEvent) options.Opt { +func WithConnectionEvent(value chan *dispatcher.ConnectionEvent) options.Opt { return func(p options.Params) { if setter, ok := p.(connectEventChSetter); ok { setter.SetConnectEventCh(value) @@ -133,7 +133,7 @@ func (p *params) SetTimeBetweenConnectAttempts(value time.Duration) { p.timeBetweenConnAttempts = value } -func (p *params) SetConnectEventCh(value chan *fab.ConnectionEvent) { +func (p *params) SetConnectEventCh(value chan *dispatcher.ConnectionEvent) { logger.Debugf("ConnectEventCh: %#v", value) p.connEventCh = value } @@ -160,7 +160,7 @@ type reconnectInitialDelaySetter interface { } type connectEventChSetter interface { - SetConnectEventCh(value chan *fab.ConnectionEvent) + SetConnectEventCh(value chan *dispatcher.ConnectionEvent) } type timeBetweenConnectAttemptsSetter interface { diff --git a/pkg/fab/events/deliverclient/connection/connection.go b/pkg/fab/events/deliverclient/connection/connection.go index 51aac54698..639266e641 100755 --- a/pkg/fab/events/deliverclient/connection/connection.go +++ b/pkg/fab/events/deliverclient/connection/connection.go @@ -16,6 +16,7 @@ import ( "github.com/golang/protobuf/proto" fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" "github.com/hyperledger/fabric-sdk-go/pkg/fab/comm" clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" "github.com/hyperledger/fabric-sdk-go/pkg/logging" @@ -57,13 +58,9 @@ var ( ) // New returns a new Deliver Server connection -func New(ctx fabcontext.Client, channelID string, streamProvider StreamProvider, url string, opts ...options.Opt) (*DeliverConnection, error) { - if channelID == "" { - return nil, errors.New("channel ID not provided") - } - +func New(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamProvider, url string, opts ...options.Opt) (*DeliverConnection, error) { connect, err := comm.NewConnection( - ctx, channelID, + ctx, chConfig, func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) { return streamProvider(pb.NewDeliverClient(grpcconn)) }, @@ -145,7 +142,7 @@ func (c *DeliverConnection) createSignedEnvelope(msg proto.Message) (*cb.Envelop var msgVersion int32 var epoch uint64 - payloadChannelHeader := utils.MakeChannelHeader(cb.HeaderType_DELIVER_SEEK_INFO, msgVersion, c.ChannelID(), epoch) + payloadChannelHeader := utils.MakeChannelHeader(cb.HeaderType_DELIVER_SEEK_INFO, msgVersion, c.ChannelConfig().Name(), epoch) payloadChannelHeader.TlsCertHash = c.TLSCertHash() data, err := proto.Marshal(msg) diff --git a/pkg/fab/events/deliverclient/connection/connection_test.go b/pkg/fab/events/deliverclient/connection/connection_test.go index ba49970b8f..693a117a58 100755 --- a/pkg/fab/events/deliverclient/connection/connection_test.go +++ b/pkg/fab/events/deliverclient/connection/connection_test.go @@ -42,17 +42,14 @@ var ( ) func TestInvalidConnectionOpts(t *testing.T) { - if _, err := New(newMockContext(), "", Deliver, peerURL); err == nil { - t.Fatalf("expecting error creating new connection without channel but got none") - } - if _, err := New(newMockContext(), "channelid", Deliver, "grpcs://invalidhost:7051"); err == nil { + if _, err := New(newMockContext(), fabmocks.NewMockChannelCfg("mychannel"), Deliver, "grpcs://invalidhost:7051"); err == nil { t.Fatalf("expecting error creating new connection with invaid address but got none") } } func TestConnection(t *testing.T) { channelID := "mychannel" - conn, err := New(newMockContext(), channelID, Deliver, peerURL, + conn, err := New(newMockContext(), fabmocks.NewMockChannelCfg(channelID), Deliver, peerURL, comm.WithConnectTimeout(3*time.Second), comm.WithFailFast(true), comm.WithKeepAliveParams( @@ -79,7 +76,7 @@ func TestForbiddenConnection(t *testing.T) { defer deliverServer.SetStatus(cb.Status_UNKNOWN) channelID := "mychannel" - conn, err := New(newMockContext(), channelID, Deliver, peerURL, + conn, err := New(newMockContext(), fabmocks.NewMockChannelCfg(channelID), Deliver, peerURL, comm.WithConnectTimeout(3*time.Second), comm.WithFailFast(true), comm.WithKeepAliveParams( @@ -94,23 +91,9 @@ func TestForbiddenConnection(t *testing.T) { t.Fatalf("error creating new connection: %s", err) } - eventch := make(chan interface{}) - - go conn.Receive(eventch) - - select { - case e, ok := <-eventch: - if !ok { - t.Fatalf("unexpected closed connection") - } - statusResponse := e.(*pb.DeliverResponse).Type.(*pb.DeliverResponse_Status) - if statusResponse.Status != expectedStatus { - t.Fatalf("expecting status %s but got %s", expectedStatus, statusResponse.Status) - } - case <-time.After(5 * time.Second): - t.Fatalf("timed out waiting for event") - } + conn.Close() + // Calling close again should be ignored conn.Close() } @@ -125,7 +108,7 @@ func TestSend(t *testing.T) { func TestDisconnected(t *testing.T) { channelID := "mychannel" - conn, err := New(newMockContext(), channelID, Deliver, peerURL) + conn, err := New(newMockContext(), fabmocks.NewMockChannelCfg(channelID), Deliver, peerURL) if err != nil { t.Fatalf("error creating new connection: %s", err) } @@ -165,7 +148,7 @@ func getStreamProvider(streamType streamType) StreamProvider { func testSend(t *testing.T, streamType streamType) { channelID := "mychannel" - conn, err := New(newMockContext(), channelID, getStreamProvider(streamType), peerURL) + conn, err := New(newMockContext(), fabmocks.NewMockChannelCfg(channelID), getStreamProvider(streamType), peerURL) if err != nil { t.Fatalf("error creating new connection: %s", err) } diff --git a/pkg/fab/events/deliverclient/deliverclient.go b/pkg/fab/events/deliverclient/deliverclient.go index 2c305197e5..cc740b09f6 100755 --- a/pkg/fab/events/deliverclient/deliverclient.go +++ b/pkg/fab/events/deliverclient/deliverclient.go @@ -26,13 +26,13 @@ import ( var logger = logging.NewLogger("fabsdk/fab") // deliverProvider is the connection provider used for connecting to the Deliver service -var deliverProvider = func(channelID string, context fabcontext.Client, peer fab.Peer) (api.Connection, error) { - return deliverconn.New(context, channelID, deliverconn.Deliver, peer.URL()) +var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) { + return deliverconn.New(context, chConfig, deliverconn.Deliver, peer.URL()) } // deliverFilteredProvider is the connection provider used for connecting to the DeliverFiltered service -var deliverFilteredProvider = func(channelID string, context fabcontext.Client, peer fab.Peer) (api.Connection, error) { - return deliverconn.New(context, channelID, deliverconn.DeliverFiltered, peer.URL()) +var deliverFilteredProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) { + return deliverconn.New(context, chConfig, deliverconn.DeliverFiltered, peer.URL()) } // Client connects to a peer and receives channel events, such as bock, filtered block, chaincode, and transaction status events. @@ -42,18 +42,14 @@ type Client struct { } // New returns a new deliver event client -func New(context fabcontext.Client, channelID string, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) { - if channelID == "" { - return nil, errors.New("expecting channel ID") - } - +func New(context fabcontext.Client, chConfig fab.ChannelCfg, opts ...options.Opt) (*Client, error) { params := defaultParams() options.Apply(params, opts) client := &Client{ Client: *client.New( params.permitBlockEvents, - dispatcher.New(context, channelID, params.connProvider, discoveryService, opts...), + dispatcher.New(context, chConfig, params.connProvider, opts...), opts..., ), params: *params, diff --git a/pkg/fab/events/deliverclient/deliverclient_test.go b/pkg/fab/events/deliverclient/deliverclient_test.go index 3d8c0e2d51..934a231caf 100755 --- a/pkg/fab/events/deliverclient/deliverclient_test.go +++ b/pkg/fab/events/deliverclient/deliverclient_test.go @@ -10,17 +10,17 @@ import ( "testing" "time" - fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client" - "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" + clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" delivermocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/mocks" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek" esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" fabclientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" "github.com/pkg/errors" @@ -38,11 +38,13 @@ var ( ) func TestOptionsInNewClient(t *testing.T) { - if _, err := New(newMockContext(), "", clientmocks.CreateDiscoveryService(peer1, peer2)); err == nil { - t.Fatalf("expecting error with no channel ID but got none") - } - - client, err := New(newMockContext(), "mychannel", clientmocks.CreateDiscoveryService(peer1, peer2), + channelID := "mychannel" + client, err := New( + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), WithBlockEvents(), ) if err != nil { @@ -52,9 +54,13 @@ func TestOptionsInNewClient(t *testing.T) { } func TestClientConnect(t *testing.T) { + channelID := "mychannel" eventClient, err := New( - newMockContext(), "mychannel", - clientmocks.CreateDiscoveryService(peer1, peer2), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), withConnectionProvider( clientmocks.NewProviderFactory().Provider( delivermocks.NewConnection( @@ -179,9 +185,13 @@ func TestReconnectRegistration(t *testing.T) { func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmocks.Outcome, connAttemptResult clientmocks.ConnectAttemptResults) { cp := clientmocks.NewProviderFactory() + channelID := "mychannel" eventClient, err := New( - newMockContext(), "mychannel", - clientmocks.CreateDiscoveryService(peer1, peer2), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), withConnectionProvider( cp.FlakeyProvider( connAttemptResult, @@ -215,12 +225,16 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmo func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expectedOutcome clientmocks.Outcome, connAttemptResult clientmocks.ConnectAttemptResults) { cp := clientmocks.NewProviderFactory() - connectch := make(chan *fab.ConnectionEvent) + connectch := make(chan *clientdisp.ConnectionEvent) ledger := servicemocks.NewMockLedger(delivermocks.BlockEventFactory) + channelID := "mychannel" eventClient, err := New( - newMockContext(), "mychannel", - clientmocks.CreateDiscoveryService(peer1, peer2), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), withConnectionProvider( cp.FlakeyProvider( connAttemptResult, @@ -251,7 +265,7 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe go listenConnection(connectch, outcomech) // Test automatic reconnect handling - cp.Connection().ProduceEvent(dispatcher.NewDisconnectedEvent(errors.New("testing reconnect handling"))) + cp.Connection().ProduceEvent(clientdisp.NewDisconnectedEvent(errors.New("testing reconnect handling"))) var outcome clientmocks.Outcome @@ -289,8 +303,11 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA cp := clientmocks.NewProviderFactory() eventClient, err := New( - newMockContext(), channelID, - clientmocks.CreateDiscoveryService(peer1, peer2), + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), withConnectionProvider( cp.FlakeyProvider( connectResults, @@ -349,7 +366,7 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA time.Sleep(1 * time.Second) // Simulate a connection error - cp.Connection().ProduceEvent(dispatcher.NewDisconnectedEvent(errors.New("testing reconnect handling"))) + cp.Connection().ProduceEvent(clientdisp.NewDisconnectedEvent(errors.New("testing reconnect handling"))) time.Sleep(1 * time.Second) @@ -382,7 +399,7 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA } } -func listenConnection(eventch chan *fab.ConnectionEvent, outcome chan clientmocks.Outcome) { +func listenConnection(eventch chan *clientdisp.ConnectionEvent, outcome chan clientmocks.Outcome) { state := initialState for { @@ -434,7 +451,3 @@ func listenEvents(blockch <-chan *fab.BlockEvent, ccch <-chan *fab.CCEvent, wait } } } - -func newMockContext() fabcontext.Client { - return fabclientmocks.NewMockContext(fabclientmocks.NewMockUser("user1")) -} diff --git a/pkg/fab/events/deliverclient/dispatcher/dispatcher.go b/pkg/fab/events/deliverclient/dispatcher/dispatcher.go index ce6a4daa24..3253c8bdf0 100755 --- a/pkg/fab/events/deliverclient/dispatcher/dispatcher.go +++ b/pkg/fab/events/deliverclient/dispatcher/dispatcher.go @@ -36,9 +36,9 @@ type Dispatcher struct { } // New returns a new deliver dispatcher -func New(context fabcontext.Client, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) *Dispatcher { +func New(context fabcontext.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts ...options.Opt) *Dispatcher { return &Dispatcher{ - Dispatcher: *clientdisp.New(context, channelID, connectionProvider, discoveryService, opts...), + Dispatcher: *clientdisp.New(context, chConfig, connectionProvider, opts...), } } @@ -64,7 +64,7 @@ func (ed *Dispatcher) handleSeekEvent(e esdispatcher.Event) { } if err := ed.connection().Send(evt.SeekInfo); err != nil { - evt.ErrCh <- errors.Wrapf(err, "error sending seek info for channel [%s]", ed.ChannelID()) + evt.ErrCh <- errors.Wrapf(err, "error sending seek info for channel [%s]", ed.ChannelConfig().Name()) } else { evt.ErrCh <- nil } diff --git a/pkg/fab/events/deliverclient/dispatcher/dispatcher_test.go b/pkg/fab/events/deliverclient/dispatcher/dispatcher_test.go index 968d72b28a..922d54fefe 100755 --- a/pkg/fab/events/deliverclient/dispatcher/dispatcher_test.go +++ b/pkg/fab/events/deliverclient/dispatcher/dispatcher_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" @@ -32,13 +31,16 @@ func TestSeek(t *testing.T) { channelID := "testchannel" dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( delivermocks.NewConnection( clientmocks.WithLedger(servicemocks.NewMockLedger(delivermocks.BlockEventFactory)), ), ), - clientmocks.CreateDiscoveryService(peer1, peer2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) @@ -90,7 +92,11 @@ func TestUnauthorized(t *testing.T) { channelID := "testchannel" dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( delivermocks.NewConnection( clientmocks.WithResults( @@ -99,7 +105,6 @@ func TestUnauthorized(t *testing.T) { clientmocks.WithLedger(servicemocks.NewMockLedger(delivermocks.BlockEventFactory)), ), ), - clientmocks.CreateDiscoveryService(peer1, peer2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) @@ -113,7 +118,7 @@ func TestUnauthorized(t *testing.T) { // Register connection event errch := make(chan error) regch := make(chan fab.Registration) - conneventch := make(chan *fab.ConnectionEvent, 5) + conneventch := make(chan *clientdisp.ConnectionEvent, 5) dispatcherEventch <- clientdisp.NewRegisterConnectionEvent(conneventch, regch, errch) select { @@ -150,13 +155,16 @@ func TestBlockEvents(t *testing.T) { ledger := servicemocks.NewMockLedger(delivermocks.BlockEventFactory) dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( delivermocks.NewConnection( clientmocks.WithLedger(ledger), ), ), - clientmocks.CreateDiscoveryService(peer1, peer2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) @@ -215,13 +223,16 @@ func TestFilteredBlockEvents(t *testing.T) { ledger := servicemocks.NewMockLedger(delivermocks.FilteredBlockEventFactory) dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( delivermocks.NewConnection( clientmocks.WithLedger(ledger), ), ), - clientmocks.CreateDiscoveryService(peer1, peer2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) @@ -276,7 +287,3 @@ func TestFilteredBlockEvents(t *testing.T) { t.Fatalf("Error stopping dispatcher: %s", err) } } - -func newMockContext() fabcontext.Client { - return fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) -} diff --git a/pkg/fab/events/eventhubclient/connection/connection.go b/pkg/fab/events/eventhubclient/connection/connection.go index 00c92655b5..6bc04e6756 100755 --- a/pkg/fab/events/eventhubclient/connection/connection.go +++ b/pkg/fab/events/eventhubclient/connection/connection.go @@ -13,6 +13,7 @@ import ( "time" fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" "google.golang.org/grpc" @@ -35,13 +36,9 @@ type EventHubConnection struct { } // New returns a new Connection to the event hub. -func New(ctx fabcontext.Client, channelID string, url string, opts ...options.Opt) (*EventHubConnection, error) { - if channelID == "" { - return nil, errors.New("channel ID not provided") - } - +func New(ctx fabcontext.Client, chConfig fab.ChannelCfg, url string, opts ...options.Opt) (*EventHubConnection, error) { connect, err := comm.NewConnection( - ctx, channelID, + ctx, chConfig, func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) { return pb.NewEventsClient(grpcconn).Chat(context.Background()) }, diff --git a/pkg/fab/events/eventhubclient/connection/connection_test.go b/pkg/fab/events/eventhubclient/connection/connection_test.go index 2b5dd073bc..e9d4d9f963 100755 --- a/pkg/fab/events/eventhubclient/connection/connection_test.go +++ b/pkg/fab/events/eventhubclient/connection/connection_test.go @@ -29,17 +29,14 @@ const ( ) func TestInvalidConnectionOpts(t *testing.T) { - if _, err := New(newMockContext(), "", eventURL); err == nil { - t.Fatalf("expecting error creating new connection without channel but got none") - } - if _, err := New(newMockContext(), "channelid", "grpcs://invalidhost:7053"); err == nil { + if _, err := New(newMockContext(), fabmocks.NewMockChannelCfg("channelid"), "grpcs://invalidhost:7053"); err == nil { t.Fatalf("expecting error creating new connection with invaid address but got none") } } func TestConnection(t *testing.T) { channelID := "mychannel" - conn, err := New(newMockContext(), channelID, eventURL) + conn, err := New(newMockContext(), fabmocks.NewMockChannelCfg(channelID), eventURL) if err != nil { t.Fatalf("error creating new connection: %s", err) } @@ -52,7 +49,7 @@ func TestConnection(t *testing.T) { func TestSend(t *testing.T) { channelID := "mychannel" - conn, err := New(newMockContext(), channelID, eventURL) + conn, err := New(newMockContext(), fabmocks.NewMockChannelCfg(channelID), eventURL) if err != nil { t.Fatalf("error creating new connection: %s", err) } @@ -132,7 +129,7 @@ func TestSend(t *testing.T) { func TestDisconnected(t *testing.T) { channelID := "mychannel" - conn, err := New(newMockContext(), channelID, eventURL) + conn, err := New(newMockContext(), fabmocks.NewMockChannelCfg(channelID), eventURL) if err != nil { t.Fatalf("error creating new connection: %s", err) } diff --git a/pkg/fab/events/eventhubclient/discoveryprovider.go b/pkg/fab/events/eventhubclient/discoveryprovider.go new file mode 100644 index 0000000000..a621fdc65a --- /dev/null +++ b/pkg/fab/events/eventhubclient/discoveryprovider.go @@ -0,0 +1,82 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package eventhubclient + +import ( + "github.com/hyperledger/fabric-sdk-go/pkg/common/context" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/endpoint" + "github.com/pkg/errors" +) + +// discoveryProvider is a wrapper around the discovery provider that +// converts each peer into an EventEndpoint (which provides the event URL). +type discoveryProvider struct { + fab.DiscoveryProvider + ctx context.Client +} + +func newDiscoveryProvider(ctx context.Client) *discoveryProvider { + return &discoveryProvider{ + DiscoveryProvider: ctx.DiscoveryProvider(), + ctx: ctx, + } +} + +// CreateDiscoveryService creates a new DiscoveryService for the given channel +func (p *discoveryProvider) CreateDiscoveryService(channelID string) (fab.DiscoveryService, error) { + target, err := p.DiscoveryProvider.CreateDiscoveryService(channelID) + if err != nil { + return nil, err + } + return &discoveryService{ + DiscoveryService: target, + ctx: p.ctx, + }, nil +} + +type discoveryService struct { + fab.DiscoveryService + ctx context.Client +} + +func (s *discoveryService) GetPeers() ([]fab.Peer, error) { + var eventEndpoints []fab.Peer + + peers, err := s.DiscoveryService.GetPeers() + if err != nil { + return nil, err + } + + // Choose only the peers from the MSP in context + // since Event Hub connections are only allowed + // using the local MSP. + mspID := s.ctx.MspID() + + for _, peer := range peers { + if peer.MSPID() != mspID { + continue + } + + peerConfig, err := s.ctx.Config().PeerConfigByURL(peer.URL()) + if err != nil { + return nil, errors.Wrapf(err, "unable to determine event hub URL from [%s]", peer.URL()) + } + if peerConfig == nil { + return nil, errors.Errorf("unable to determine event hub URL from [%s]", peer.URL()) + } + + eventEndpoints = append(eventEndpoints, + &endpoint.EventEndpoint{ + Peer: peer, + EvtURL: peerConfig.EventURL, + }, + ) + } + + return eventEndpoints, nil +} diff --git a/pkg/fab/events/eventhubclient/dispatcher/dispatcher.go b/pkg/fab/events/eventhubclient/dispatcher/dispatcher.go index 52d504ae5a..907177e00d 100755 --- a/pkg/fab/events/eventhubclient/dispatcher/dispatcher.go +++ b/pkg/fab/events/eventhubclient/dispatcher/dispatcher.go @@ -7,7 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package dispatcher import ( - "github.com/hyperledger/fabric-sdk-go/pkg/common/context" + fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api" @@ -35,9 +35,9 @@ type Dispatcher struct { } // New creates a new event hub dispatcher -func New(context context.Client, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) *Dispatcher { +func New(context fabcontext.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts ...options.Opt) *Dispatcher { return &Dispatcher{ - Dispatcher: *clientdisp.New(context, channelID, connectionProvider, discoveryService, opts...), + Dispatcher: *clientdisp.New(context, chConfig, connectionProvider, opts...), } } diff --git a/pkg/fab/events/eventhubclient/dispatcher/dispatcher_test.go b/pkg/fab/events/eventhubclient/dispatcher/dispatcher_test.go index 80eacab000..a67787d27e 100755 --- a/pkg/fab/events/eventhubclient/dispatcher/dispatcher_test.go +++ b/pkg/fab/events/eventhubclient/dispatcher/dispatcher_test.go @@ -36,13 +36,16 @@ var ( func TestRegisterInterests(t *testing.T) { channelID := "testchannel" dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(endpoint1, endpoint2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( ehmocks.NewConnection( clientmocks.WithLedger(servicemocks.NewMockLedger(ehmocks.BlockEventFactory)), ), ), - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) @@ -129,7 +132,11 @@ func TestRegisterInterests(t *testing.T) { func TestRegisterInterestsInvalid(t *testing.T) { channelID := "testchannel" dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(endpoint1, endpoint2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( ehmocks.NewConnection( clientmocks.WithLedger(servicemocks.NewMockLedger(ehmocks.BlockEventFactory)), @@ -139,7 +146,6 @@ func TestRegisterInterestsInvalid(t *testing.T) { ), ), ), - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) @@ -226,7 +232,11 @@ func TestRegisterInterestsInvalid(t *testing.T) { func TestTimedOutRegister(t *testing.T) { channelID := "testchannel" dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(endpoint1, endpoint2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( ehmocks.NewConnection( clientmocks.WithResults( @@ -235,7 +245,6 @@ func TestTimedOutRegister(t *testing.T) { clientmocks.WithLedger(servicemocks.NewMockLedger(ehmocks.BlockEventFactory)), ), ), - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) @@ -283,13 +292,16 @@ func TestBlockEvents(t *testing.T) { channelID := "testchannel" ledger := servicemocks.NewMockLedger(ehmocks.BlockEventFactory) dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(endpoint1, endpoint2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( ehmocks.NewConnection( clientmocks.WithLedger(ledger), ), ), - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) @@ -352,13 +364,16 @@ func TestFilteredBlockEvents(t *testing.T) { channelID := "testchannel" ledger := servicemocks.NewMockLedger(ehmocks.FilteredBlockEventFactory) dispatcher := New( - newMockContext(), channelID, + fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(endpoint1, endpoint2), + ), + fabmocks.NewMockChannelCfg(channelID), clientmocks.NewProviderFactory().Provider( ehmocks.NewConnection( clientmocks.WithLedger(ledger), ), ), - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), ) if err := dispatcher.Start(); err != nil { t.Fatalf("Error starting dispatcher: %s", err) diff --git a/pkg/fab/events/eventhubclient/eventhubclient.go b/pkg/fab/events/eventhubclient/eventhubclient.go index f929eabb5f..1cae67b12f 100755 --- a/pkg/fab/events/eventhubclient/eventhubclient.go +++ b/pkg/fab/events/eventhubclient/eventhubclient.go @@ -24,14 +24,14 @@ import ( var logger = logging.NewLogger("fabsdk/fab") -var ehConnProvider = func(channelID string, context context.Client, peer fab.Peer) (api.Connection, error) { +var ehConnProvider = func(context context.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) { eventEndpoint, ok := peer.(api.EventEndpoint) if !ok { panic("peer is not an EventEndpoint") } return connection.New( - context, channelID, eventEndpoint.EventURL(), + context, chConfig, eventEndpoint.EventURL(), ) } @@ -42,18 +42,18 @@ type Client struct { } // New returns a new event hub client -func New(context context.Client, channelID string, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) { - if channelID == "" { - return nil, errors.New("expecting channel ID") - } - +func New(context context.Client, chConfig fab.ChannelCfg, opts ...options.Opt) (*Client, error) { params := defaultParams() options.Apply(params, opts) + // The EventHub requires a custom Discovery Provider + // that produces EventEndpoints (which include the event URL). + ehCtx := newEventHubContext(context) + client := &Client{ Client: *client.New( params.permitBlockEvents, - dispatcher.New(context, channelID, params.connProvider, discoveryService, opts...), + dispatcher.New(ehCtx, chConfig, params.connProvider, opts...), opts..., ), params: *params, @@ -88,3 +88,20 @@ func (c *Client) registerInterests() error { logger.Debugf("successfully sent register interests") return nil } + +// ehContext overrides the DiscoveryProvider by returning +// the event hub discovery provider +type ehContext struct { + context.Client +} + +func newEventHubContext(ctx context.Client) context.Client { + return &ehContext{ + Client: ctx, + } +} + +// DiscoveryProvider returns a custom discovery provider for the event hub +func (ctx *ehContext) DiscoveryProvider() fab.DiscoveryProvider { + return newDiscoveryProvider(ctx.Client) +} diff --git a/pkg/fab/events/eventhubclient/eventhubclient_test.go b/pkg/fab/events/eventhubclient/eventhubclient_test.go index 88845e35f8..5167e681c4 100755 --- a/pkg/fab/events/eventhubclient/eventhubclient_test.go +++ b/pkg/fab/events/eventhubclient/eventhubclient_test.go @@ -12,18 +12,17 @@ import ( "testing" "time" - "github.com/hyperledger/fabric-sdk-go/pkg/common/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" - "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client" - "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" + clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" - "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/endpoint" ehclientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/eventhubclient/mocks" ehmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/eventhubclient/mocks" esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" + fabclientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" @@ -37,16 +36,17 @@ const ( var ( defaultOpts = []options.Opt{} - endpoint1 = newMockEventEndpoint("grpcs://peer1.example.com:7053") - endpoint2 = newMockEventEndpoint("grpcs://peer2.example.com:7053") + peer1 = fabclientmocks.NewMockPeer("peer1", "peer1.example.com:7051") + peer2 = fabclientmocks.NewMockPeer("peer2", "peer2.example.com:7051") + eventURL1 = "peer1.example.com:7053" + eventURL2 = "peer2.example.com:7053" ) func TestOptionsInNewClient(t *testing.T) { - if _, err := New(newMockContext(), "", clientmocks.CreateDiscoveryService(endpoint1, endpoint2)); err == nil { - t.Fatalf("expecting error with no channel ID but got none") - } - - client, err := New(newMockContext(), "mychannel", clientmocks.CreateDiscoveryService(endpoint1, endpoint2), + channelID := "mychannel" + client, err := New( + newMockContext(), + fabmocks.NewMockChannelCfg(channelID), WithBlockEvents(), ) if err != nil { @@ -56,9 +56,10 @@ func TestOptionsInNewClient(t *testing.T) { } func TestClientConnect(t *testing.T) { + channelID := "mychannel" eventClient, err := New( - newMockContext(), "mychannel", - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), + newMockContext(), + fabmocks.NewMockChannelCfg(channelID), withConnectionProviderAndInterests( clientmocks.NewProviderFactory().Provider( ehclientmocks.NewConnection( @@ -88,9 +89,10 @@ func TestClientConnect(t *testing.T) { } func TestTimeoutClientConnect(t *testing.T) { + channelID := "mychannel" eventClient, err := New( - newMockContext(), "mychannel", - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), + newMockContext(), + fabmocks.NewMockChannelCfg(channelID), withConnectionProviderAndInterests( clientmocks.NewProviderFactory().Provider( ehclientmocks.NewConnection( @@ -201,9 +203,10 @@ func TestReconnectRegistration(t *testing.T) { } func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmocks.Outcome, connAttemptResult clientmocks.ConnectAttemptResults) { + channelID := "mychannel" eventClient, err := New( - newMockContext(), "mychannel", - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), + newMockContext(), + fabmocks.NewMockChannelCfg(channelID), withConnectionProviderAndInterests( clientmocks.NewProviderFactory().FlakeyProvider( connAttemptResult, @@ -235,14 +238,15 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmo } func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expectedOutcome clientmocks.Outcome, connAttemptResult clientmocks.ConnectAttemptResults) { + channelID := "mychannel" cp := clientmocks.NewProviderFactory() - connectch := make(chan *fab.ConnectionEvent) + connectch := make(chan *clientdisp.ConnectionEvent) ledger := servicemocks.NewMockLedger(ehmocks.BlockEventFactory) eventClient, err := New( - newMockContext(), "mychannel", - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), + newMockContext(), + fabmocks.NewMockChannelCfg(channelID), withConnectionProviderAndInterests( cp.FlakeyProvider( connAttemptResult, @@ -274,7 +278,7 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe go listenConnection(t, connectch, outcomech) // Test automatic reconnect handling - cp.Connection().ProduceEvent(dispatcher.NewDisconnectedEvent(errors.New("testing reconnect handling"))) + cp.Connection().ProduceEvent(clientdisp.NewDisconnectedEvent(errors.New("testing reconnect handling"))) var outcome clientmocks.Outcome @@ -300,8 +304,8 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents clientmocks.Num cp := clientmocks.NewProviderFactory() eventClient, err := New( - newMockContext(), channelID, - clientmocks.CreateDiscoveryService(endpoint1, endpoint2), + newMockContext(), + fabmocks.NewMockChannelCfg(channelID), withConnectionProviderAndInterests( cp.FlakeyProvider( connectResults, @@ -361,7 +365,7 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents clientmocks.Num time.Sleep(500 * time.Millisecond) // Simulate a connection error - cp.Connection().ProduceEvent(dispatcher.NewDisconnectedEvent(errors.New("testing reconnect handling"))) + cp.Connection().ProduceEvent(clientdisp.NewDisconnectedEvent(errors.New("testing reconnect handling"))) // Wait for the client to reconnect time.Sleep(2 * time.Second) @@ -400,7 +404,7 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents clientmocks.Num } } -func listenConnection(t *testing.T, eventch chan *fab.ConnectionEvent, outcome chan clientmocks.Outcome) { +func listenConnection(t *testing.T, eventch chan *clientdisp.ConnectionEvent, outcome chan clientmocks.Outcome) { state := initialState for { @@ -456,12 +460,35 @@ func listenEvents(blockch <-chan *fab.BlockEvent, ccch <-chan *fab.CCEvent, wait } } -func newMockContext() context.Client { - return fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) +type mockEventURLConfig struct { + core.Config + eventURLMap map[string]string } -func newMockEventEndpoint(url string) api.EventEndpoint { - return &endpoint.EventEndpoint{ - EvtURL: url, +func newMockEventURLConfig() *mockEventURLConfig { + return &mockEventURLConfig{ + eventURLMap: make(map[string]string), } } + +func (c *mockEventURLConfig) setURL(url, mapsToURL string) { + c.eventURLMap[url] = mapsToURL +} + +func (c *mockEventURLConfig) PeerConfigByURL(url string) (*core.PeerConfig, error) { + return &core.PeerConfig{ + EventURL: c.eventURLMap[url], + }, nil +} + +func newMockContext() *fabmocks.MockContext { + ctx := fabmocks.NewMockContextWithCustomDiscovery( + fabmocks.NewMockUser("user1"), + clientmocks.NewDiscoveryProvider(peer1, peer2), + ) + config := newMockEventURLConfig() + config.setURL(peer1.URL(), eventURL1) + config.setURL(peer2.URL(), eventURL2) + ctx.SetConfig(config) + return ctx +}