Skip to content

Commit

Permalink
[FAB-8610] Integrate with latest Client Context
Browse files Browse the repository at this point in the history
- Integrate with the latest context.Client which
  includes the InfraProvider and DiscoveryProvider
- Remove RegisterConnectionEvent from the
  EventClient interface since this is an option
  when creating an events client

Change-Id: Id96646eb43d2cbe2240687c257bb5964a0e8d89b
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Mar 9, 2018
1 parent 8e5aa5d commit 53a4bd0
Show file tree
Hide file tree
Showing 26 changed files with 463 additions and 271 deletions.
5 changes: 0 additions & 5 deletions pkg/context/api/fab/eventservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,4 @@ type EventClient interface {
// Close closes the connection to the event server and releases all resources.
// Once this function is invoked the client may no longer be used.
Close()

// RegisterConnectionEvent registers a connection event. The returned
// ConnectionEvent channel is called whenever the client clients to
// or disconnects from the event server
RegisterConnectionEvent() (Registration, chan ConnectionEvent, error)
}
17 changes: 9 additions & 8 deletions pkg/fab/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config/urlutil"
"github.com/hyperledger/fabric-sdk-go/pkg/logging"
Expand All @@ -29,16 +30,16 @@ type StreamProvider func(conn *grpc.ClientConn) (grpc.ClientStream, error)

// GRPCConnection manages the GRPC connection and client stream
type GRPCConnection struct {
channelID string
context fabcontext.Client
chConfig fab.ChannelCfg
conn *grpc.ClientConn
stream grpc.ClientStream
context fabcontext.Client
tlsCertHash []byte
done int32
}

// NewConnection creates a new connection
func NewConnection(ctx fabcontext.Client, channelID string, streamProvider StreamProvider, url string, opts ...options.Opt) (*GRPCConnection, error) {
func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamProvider, url string, opts ...options.Opt) (*GRPCConnection, error) {
if url == "" {
return nil, errors.New("server URL not specified")
}
Expand Down Expand Up @@ -73,17 +74,17 @@ func NewConnection(ctx fabcontext.Client, channelID string, streamProvider Strea
}

return &GRPCConnection{
channelID: channelID,
context: ctx,
chConfig: chConfig,
conn: grpcconn,
stream: stream,
context: ctx,
tlsCertHash: comm.TLSCertHash(ctx.Config()),
}, nil
}

// ChannelID returns the ID of the channel
func (c *GRPCConnection) ChannelID() string {
return c.channelID
// ChannelConfig returns the channel configuration
func (c *GRPCConnection) ChannelConfig() fab.ChannelCfg {
return c.chConfig
}

// Close closes the connection
Expand Down
12 changes: 5 additions & 7 deletions pkg/fab/comm/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ func TestConnection(t *testing.T) {
channelID := "testchannel"

context := newMockContext()
chConfig := fabmocks.NewMockChannelCfg(channelID)

conn, err := NewConnection(context, channelID, testStream, "")
conn, err := NewConnection(context, chConfig, testStream, "")
if err == nil {
t.Fatalf("expected error creating new connection with empty URL")
}
conn, err = NewConnection(context, channelID, testStream, "invalidhost:0000",
conn, err = NewConnection(context, chConfig, testStream, "invalidhost:0000",
WithFailFast(true),
WithCertificate(nil),
WithHostOverride(""),
Expand All @@ -50,21 +51,18 @@ func TestConnection(t *testing.T) {
if err == nil {
t.Fatalf("expected error creating new connection with invalid URL")
}
conn, err = NewConnection(context, channelID, invalidStream, peerURL)
conn, err = NewConnection(context, chConfig, invalidStream, peerURL)
if err == nil {
t.Fatalf("expected error creating new connection with invalid stream but got none")
}

conn, err = NewConnection(context, channelID, testStream, peerURL)
conn, err = NewConnection(context, chConfig, testStream, peerURL)
if err != nil {
t.Fatalf("error creating new connection: %s", err)
}
if conn.Closed() {
t.Fatalf("expected connection to be open")
}
if conn.ChannelID() != channelID {
t.Fatalf("expected channel ID [%s] but got [%s]", channelID, conn.ChannelID())
}
if conn.Stream() == nil {
t.Fatalf("got invalid stream")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fab/events/api/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ type Connection interface {
}

// ConnectionProvider creates a Connection.
type ConnectionProvider func(channelID string, context context.Client, peer fab.Peer) (Connection, error)
type ConnectionProvider func(context context.Client, chConfig fab.ChannelCfg, peer fab.Peer) (Connection, error)
14 changes: 7 additions & 7 deletions pkg/fab/events/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Client struct {
eventservice.Service
params
sync.RWMutex
connEvent chan *fab.ConnectionEvent
connEvent chan *dispatcher.ConnectionEvent
connectionState int32
stopped int32
registerOnce sync.Once
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *Client) connect() error {

c.registerOnce.Do(func() {
logger.Debugf("Submitting connection event registration...")
_, eventch, err := c.RegisterConnectionEvent()
_, eventch, err := c.registerConnectionEvent()
if err != nil {
logger.Errorf("Error registering for connection events: %s", err)
c.Close()
Expand Down Expand Up @@ -239,15 +239,15 @@ func (c *Client) RegisterBlockEvent(filter ...fab.BlockFilter) (fab.Registration
return c.Service.RegisterBlockEvent(filter...)
}

// RegisterConnectionEvent registers a connection event. The returned
// registerConnectionEvent registers a connection event. The returned
// ConnectionEvent channel will be called whenever the client clients or disconnects
// from the event server
func (c *Client) RegisterConnectionEvent() (fab.Registration, chan *fab.ConnectionEvent, error) {
func (c *Client) registerConnectionEvent() (fab.Registration, chan *dispatcher.ConnectionEvent, error) {
if c.Stopped() {
return nil, nil, errors.New("event client is closed")
}

eventch := make(chan *fab.ConnectionEvent, c.eventConsumerBufferSize)
eventch := make(chan *dispatcher.ConnectionEvent, c.eventConsumerBufferSize)
errch := make(chan error)
regch := make(chan fab.Registration)
c.Submit(dispatcher.NewRegisterConnectionEvent(eventch, regch, errch))
Expand Down Expand Up @@ -348,13 +348,13 @@ func (c *Client) closeConnectEventChan() {
}
}

func (c *Client) connectEventChan() chan *fab.ConnectionEvent {
func (c *Client) connectEventChan() chan *dispatcher.ConnectionEvent {
c.RLock()
defer c.RUnlock()
return c.connEventCh
}

func (c *Client) notifyConnectEventChan(event *fab.ConnectionEvent) {
func (c *Client) notifyConnectEventChan(event *dispatcher.ConnectionEvent) {
c.RLock()
defer c.RUnlock()
if c.connEventCh != nil {
Expand Down
Loading

0 comments on commit 53a4bd0

Please sign in to comment.