Skip to content

Commit

Permalink
FABGW-6 Implement Submit()
Browse files Browse the repository at this point in the history
Use discovery to get peer endorsers and orderers in order to submit a transaction to the ledger

A number of TODO comments are in the code marking where additional logic will be added as part of this user story (or future story where specified). This includes adding retry logic according to endorsement layouts, configurable timeouts, updating stored channel membership in response to config updates.

Signed-off-by: andrew-coleman <[email protected]>
  • Loading branch information
andrew-coleman authored and sykesm committed Feb 25, 2021
1 parent 7172484 commit 7a8e1de
Show file tree
Hide file tree
Showing 18 changed files with 2,403 additions and 533 deletions.
21 changes: 12 additions & 9 deletions internal/peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ import (
peergossip "github.com/hyperledger/fabric/internal/peer/gossip"
"github.com/hyperledger/fabric/internal/peer/version"
"github.com/hyperledger/fabric/internal/pkg/comm"
gatewayserver "github.com/hyperledger/fabric/internal/pkg/gateway/server"
"github.com/hyperledger/fabric/internal/pkg/gateway"
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/protoutil"
Expand Down Expand Up @@ -802,8 +802,9 @@ func serve(args []string) error {
coreConfig.ValidatorPoolSize,
)

var discoveryService *discovery.Service
if coreConfig.DiscoveryEnabled {
ds := createDiscoveryService(
discoveryService = createDiscoveryService(
coreConfig,
peerInstance,
peerServer,
Expand All @@ -816,17 +817,19 @@ func serve(args []string) error {
gossipService,
)
logger.Info("Discovery service activated")
discprotos.RegisterDiscoveryServer(peerServer.Server(), ds)
discprotos.RegisterDiscoveryServer(peerServer.Server(), discoveryService)
}

if coreConfig.GatewayOptions.Enabled {
logger.Info("Starting peer with Gateway enabled")
gs, err := gatewayserver.CreateGatewayServer(serverEndorser)
if err != nil {
logger.Panicf("Failed to create Gateway server: %s", err)
if coreConfig.DiscoveryEnabled {
logger.Info("Starting peer with Gateway enabled")
gatewayprotos.RegisterGatewayServer(
peerServer.Server(),
gateway.CreateServer(&gateway.EndorserServerAdapter{Server: serverEndorser}, discoveryService, peerInstance.GossipService.SelfMembershipInfo().Endpoint),
)
} else {
logger.Warning("Discovery service must be enabled for embedded gateway")
}
gatewayprotos.RegisterGatewayServer(peerServer.Server(), gs)
logger.Info("Gateway server activated")
}

logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", coreConfig.PeerID, coreConfig.NetworkID, coreConfig.PeerAddress)
Expand Down
77 changes: 61 additions & 16 deletions internal/pkg/gateway/server/api.go → internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
/*
Copyright 2020 IBM All Rights Reserved.
Copyright 2021 IBM All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package server
package gateway

import (
"context"
"fmt"

"github.com/hyperledger/fabric/protoutil"

pb "github.com/hyperledger/fabric-protos-go/gateway"
gp "github.com/hyperledger/fabric-protos-go/gateway"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/protoutil"
)

// Evaluate will invoke the transaction function as specified in the SignedProposal
func (gs *Server) Evaluate(ctx context.Context, proposedTransaction *pb.ProposedTransaction) (*pb.Result, error) {
func (gs *Server) Evaluate(ctx context.Context, proposedTransaction *gp.ProposedTransaction) (*gp.Result, error) {
if proposedTransaction == nil {
return nil, fmt.Errorf("a proposed transaction is required")
}
signedProposal := proposedTransaction.Proposal
channel, chaincodeID, err := getChannelAndChaincodeFromSignedProposal(proposedTransaction.Proposal)
if err != nil {
// TODO need to specify status codes
return nil, fmt.Errorf("failed to unpack channel header: %w", err)
}

endorsers := gs.registry.Endorsers(channel, chaincodeID)
endorsers, err := gs.registry.endorsers(channel, chaincodeID)
if err != nil {
return nil, err
}
if len(endorsers) == 0 {
return nil, fmt.Errorf("no endorsing peers found for channel: %s", proposedTransaction.ChannelId)
}
Expand All @@ -38,8 +44,14 @@ func (gs *Server) Evaluate(ctx context.Context, proposedTransaction *pb.Proposed

// Endorse will collect endorsements by invoking the transaction function specified in the SignedProposal against
// sufficient Peers to satisfy the endorsement policy.
func (gs *Server) Endorse(ctx context.Context, proposedTransaction *pb.ProposedTransaction) (*pb.PreparedTransaction, error) {
func (gs *Server) Endorse(ctx context.Context, proposedTransaction *gp.ProposedTransaction) (*gp.PreparedTransaction, error) {
if proposedTransaction == nil {
return nil, fmt.Errorf("a proposed transaction is required")
}
signedProposal := proposedTransaction.Proposal
if signedProposal == nil {
return nil, fmt.Errorf("the proposed transaction must contain a signed proposal")
}
proposal, err := protoutil.UnmarshalProposal(signedProposal.ProposalBytes)
if err != nil {
return nil, err
Expand All @@ -48,10 +60,13 @@ func (gs *Server) Endorse(ctx context.Context, proposedTransaction *pb.ProposedT
if err != nil {
return nil, fmt.Errorf("failed to unpack channel header: %w", err)
}
endorsers := gs.registry.Endorsers(channel, chaincodeID)
endorsers, err := gs.registry.endorsers(channel, chaincodeID)
if err != nil {
return nil, err
}

var responses []*peer.ProposalResponse
// send to all the endorsers
// send to all the endorsers - TODO fan out in parallel
for _, endorser := range endorsers {
response, err := endorser.ProcessProposal(ctx, signedProposal)
if err != nil {
Expand All @@ -70,19 +85,49 @@ func (gs *Server) Endorse(ctx context.Context, proposedTransaction *pb.ProposedT
return nil, fmt.Errorf("failed to extract value from response payload: %w", err)
}

preparedTxn := &pb.PreparedTransaction{
preparedTxn := &gp.PreparedTransaction{
TxId: proposedTransaction.TxId,
ChannelId: proposedTransaction.ChannelId,
ChannelId: channel,
Response: retVal,
Envelope: env,
}
return preparedTxn, nil
}

// Submit will send the signed transaction to the ordering service. The output stream will close
// once the transaction is committed on a sufficient number of peers according to a defined policy.
func (gs *Server) Submit(txn *pb.PreparedTransaction, cs pb.Gateway_SubmitServer) error {
// not yet implemented in embedded gateway
// once the transaction is committed on a sufficient number of remoteEndorsers according to a defined policy.
func (gs *Server) Submit(txn *gp.PreparedTransaction, cs gp.Gateway_SubmitServer) error {
if txn == nil {
return fmt.Errorf("a signed prepared transaction is required")
}
if cs == nil {
return fmt.Errorf("a submit server is required")
}
orderers, err := gs.registry.orderers(txn.ChannelId)
if err != nil {
return err
}

if len(orderers) == 0 {
return fmt.Errorf("no broadcastClients discovered")
}

// send to first orderer for now
logger.Info("Submitting txn to orderer")
if err := orderers[0].Send(txn.Envelope); err != nil {
return fmt.Errorf("failed to send envelope to orderer: %w", err)
}

response, err := orderers[0].Recv()
if err != nil {
return err
}

if response == nil {
return fmt.Errorf("received nil response from orderer")
}

return fmt.Errorf("Submit() not implemented")
return cs.Send(&gp.Event{
Value: []byte(response.Info),
})
}
Loading

0 comments on commit 7a8e1de

Please sign in to comment.