Skip to content

Commit

Permalink
Fix timeout in orderer connection from gateway
Browse files Browse the repository at this point in the history
The gateway was invoking the Broadcast() method on the orderer and caching the returned stream.  This was causing timeout issues.
The code has been rewritten to open a new stream for each transaction submission.

Signed-off-by: andrew-coleman <[email protected]>
  • Loading branch information
andrew-coleman authored and sykesm committed Mar 25, 2021
1 parent d88e09c commit a559b2c
Show file tree
Hide file tree
Showing 9 changed files with 774 additions and 560 deletions.
13 changes: 11 additions & 2 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,25 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
}

orderer := orderers[0] // send to first orderer for now

broadcast, err := orderer.client.Broadcast(ctx)
if err != nil {
return nil, rpcError(
codes.Aborted,
"failed to send transaction to orderer",
&gp.EndpointError{Address: orderer.address, MspId: orderer.mspid, Message: err.Error()},
)
}
logger.Info("Submitting txn to orderer")
if err := orderer.client.Send(txn); err != nil {
if err := broadcast.Send(txn); err != nil {
return nil, rpcError(
codes.Aborted,
"failed to send transaction to orderer",
&gp.EndpointError{Address: orderer.address, MspId: orderer.mspid, Message: err.Error()},
)
}

response, err := orderer.client.Recv()
response, err := broadcast.Recv()
if err != nil {
return nil, rpcError(
codes.Aborted,
Expand Down
54 changes: 43 additions & 11 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type discovery interface {

//go:generate counterfeiter -o mocks/abclient.go --fake-name ABClient . abClient
type abClient interface {
ab.AtomicBroadcastClient
}

//go:generate counterfeiter -o mocks/abbclient.go --fake-name ABBClient . abbClient
type abbClient interface {
ab.AtomicBroadcast_BroadcastClient
}

Expand All @@ -72,6 +77,7 @@ type endpointDef struct {
proposalError error
ordererResponse string
ordererStatus int32
ordererBroadcastError error
ordererSendError error
ordererRecvError error
}
Expand Down Expand Up @@ -422,6 +428,22 @@ func TestSubmit(t *testing.T) {
},
errString: "no broadcastClients discovered",
},
{
name: "orderer broadcast fails",
plan: endorsementPlan{
"g1": {{endpoint: "localhost:7051"}},
},
endpointDefinition: &endpointDef{
proposalResponseStatus: 200,
ordererBroadcastError: status.Error(codes.FailedPrecondition, "Orderer not listening!"),
},
errString: "rpc error: code = Aborted desc = failed to send transaction to orderer",
errDetails: []*pb.EndpointError{{
Address: "orderer:7050",
MspId: "msp1",
Message: "rpc error: code = FailedPrecondition desc = Orderer not listening!",
}},
},
{
name: "send to orderer fails",
plan: endorsementPlan{
Expand Down Expand Up @@ -455,15 +477,17 @@ func TestSubmit(t *testing.T) {
}},
},
{
name: "orderer returns nil",
name: "orderer Send() returns nil",
plan: endorsementPlan{
"g1": {{endpoint: "localhost:7051"}},
},
postSetup: func(def *preparedTest) {
def.server.registry.endpointFactory.connectOrderer = func(_ *grpc.ClientConn) (ab.AtomicBroadcast_BroadcastClient, error) {
def.server.registry.endpointFactory.connectOrderer = func(_ *grpc.ClientConn) ab.AtomicBroadcastClient {
abc := &mocks.ABClient{}
abc.RecvReturns(nil, nil)
return abc, nil
abbc := &mocks.ABBClient{}
abbc.RecvReturns(nil, nil)
abc.BroadcastReturns(abbc, nil)
return abc
}
},
errString: "received nil response from orderer",
Expand All @@ -474,13 +498,15 @@ func TestSubmit(t *testing.T) {
"g1": {{endpoint: "localhost:7051"}},
},
postSetup: func(def *preparedTest) {
def.server.registry.endpointFactory.connectOrderer = func(_ *grpc.ClientConn) (ab.AtomicBroadcast_BroadcastClient, error) {
def.server.registry.endpointFactory.connectOrderer = func(_ *grpc.ClientConn) ab.AtomicBroadcastClient {
abc := &mocks.ABClient{}
abbc := &mocks.ABBClient{}
response := &ab.BroadcastResponse{
Status: cp.Status_BAD_REQUEST,
}
abc.RecvReturns(response, nil)
return abc, nil
abbc.RecvReturns(response, nil)
abc.BroadcastReturns(abbc, nil)
return abc
}
},
errString: cp.Status_name[int32(cp.Status_BAD_REQUEST)],
Expand Down Expand Up @@ -824,14 +850,20 @@ func createEndpointFactory(t *testing.T, definition *endpointDef, dialer dialer)
}
return e
},
connectOrderer: func(_ *grpc.ClientConn) (ab.AtomicBroadcast_BroadcastClient, error) {
connectOrderer: func(_ *grpc.ClientConn) ab.AtomicBroadcastClient {
abc := &mocks.ABClient{}
abc.SendReturns(definition.ordererSendError)
abc.RecvReturns(&ab.BroadcastResponse{
if definition.ordererBroadcastError != nil {
abc.BroadcastReturns(nil, definition.ordererBroadcastError)
return abc
}
abbc := &mocks.ABBClient{}
abbc.SendReturns(definition.ordererSendError)
abbc.RecvReturns(&ab.BroadcastResponse{
Info: definition.ordererResponse,
Status: cp.Status(definition.ordererStatus),
}, definition.ordererRecvError)
return abc, nil
abc.BroadcastReturns(abbc, nil)
return abc
},
dialer: dialer,
}
Expand Down
16 changes: 4 additions & 12 deletions internal/pkg/gateway/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type endorser struct {
}

type orderer struct {
client ab.AtomicBroadcast_BroadcastClient
client ab.AtomicBroadcastClient
*endpointConfig
}

Expand All @@ -34,7 +34,7 @@ type endpointConfig struct {

type (
endorserConnector func(*grpc.ClientConn) peer.EndorserClient
ordererConnector func(*grpc.ClientConn) (ab.AtomicBroadcast_BroadcastClient, error)
ordererConnector func(*grpc.ClientConn) ab.AtomicBroadcastClient
)

//go:generate counterfeiter -o mocks/dialer.go --fake-name Dialer . dialer
Expand Down Expand Up @@ -69,14 +69,10 @@ func (ef *endpointFactory) newOrderer(address, mspid string, tlsRootCerts [][]by
}
connectOrderer := ef.connectOrderer
if connectOrderer == nil {
connectOrderer = defaultOrdererConnector
}
client, err := connectOrderer(conn)
if err != nil {
return nil, err
connectOrderer = ab.NewAtomicBroadcastClient
}
return &orderer{
client: client,
client: connectOrderer(conn),
endpointConfig: &endpointConfig{address: address, mspid: mspid},
}, nil
}
Expand Down Expand Up @@ -108,7 +104,3 @@ func (ef *endpointFactory) newConnection(address string, tlsRootCerts [][]byte)
}
return conn, nil
}

func defaultOrdererConnector(conn *grpc.ClientConn) (ab.AtomicBroadcast_BroadcastClient, error) {
return ab.NewAtomicBroadcastClient(conn).Broadcast(context.Background())
}
Loading

0 comments on commit a559b2c

Please sign in to comment.