From 2d73fad49fec3d4fb44f9e106fb38d38faa23b18 Mon Sep 17 00:00:00 2001 From: Andrew Coleman Date: Mon, 20 Apr 2020 17:36:43 +0100 Subject: [PATCH] FABG-940 Register for block/contract/commit events (#63) Add support to gateway package to handle events. Consistent with Gateway programming model, but using Go channels instead of callback functions. Includes unit tests & integration tests. Signed-off-by: andrew-coleman --- pkg/gateway/contract.go | 22 +- pkg/gateway/contract_test.go | 38 ++- pkg/gateway/defaultcommithandlers.go | 53 ---- pkg/gateway/gateway.go | 25 +- pkg/gateway/gateway_test.go | 47 ---- pkg/gateway/network.go | 28 +++ pkg/gateway/network_test.go | 50 ++++ pkg/gateway/spi.go | 12 - pkg/gateway/transaction.go | 86 ++++++- pkg/gateway/transaction_test.go | 242 ++++++++++++++++++- pkg/gateway/wallet_test.go | 58 +++++ test/integration/pkg/gateway/gateway.go | 107 ++++++++ test/integration/pkg/gateway/gateway_test.go | 12 + 13 files changed, 638 insertions(+), 142 deletions(-) delete mode 100644 pkg/gateway/defaultcommithandlers.go diff --git a/pkg/gateway/contract.go b/pkg/gateway/contract.go index 92e23f7564..1545220cb0 100644 --- a/pkg/gateway/contract.go +++ b/pkg/gateway/contract.go @@ -6,7 +6,10 @@ SPDX-License-Identifier: Apache-2.0 package gateway -import "github.com/hyperledger/fabric-sdk-go/pkg/client/channel" +import ( + "github.com/hyperledger/fabric-sdk-go/pkg/client/channel" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" +) // A Contract object represents a smart contract instance in a network. // Applications should get a Contract instance from a Network using the GetContract method @@ -61,3 +64,20 @@ func (c *Contract) SubmitTransaction(name string, args ...string) ([]byte, error func (c *Contract) CreateTransaction(name string, args ...TransactionOption) (*Transaction, error) { return newTransaction(name, c, args...) } + +// RegisterEvent registers for chaincode events. Unregister must be called when the registration is no longer needed. +// Parameters: +// eventFilter is the chaincode event filter (regular expression) for which events are to be received +// +// Returns: +// the registration and a channel that is used to receive events. The channel is closed when Unregister is called. +func (c *Contract) RegisterEvent(eventFilter string) (fab.Registration, <-chan *fab.CCEvent, error) { + return c.network.event.RegisterChaincodeEvent(c.chaincodeID, eventFilter) +} + +// Unregister removes the given registration and closes the event channel. +// Parameters: +// registration is the registration handle that was returned from RegisterContractEvent method +func (c *Contract) Unregister(registration fab.Registration) { + c.network.event.Unregister(registration) +} diff --git a/pkg/gateway/contract_test.go b/pkg/gateway/contract_test.go index aa5ed2d494..f98f8f196a 100644 --- a/pkg/gateway/contract_test.go +++ b/pkg/gateway/contract_test.go @@ -40,9 +40,8 @@ func TestSubmitTransaction(t *testing.T) { gw := &Gateway{ options: &gatewayOptions{ - CommitHandler: DefaultCommitHandlers.OrgAll, - Discovery: defaultDiscovery, - Timeout: defaultTimeout, + Discovery: defaultDiscovery, + Timeout: defaultTimeout, }, } @@ -70,9 +69,8 @@ func TestEvaluateTransaction(t *testing.T) { gw := &Gateway{ options: &gatewayOptions{ - CommitHandler: DefaultCommitHandlers.OrgAll, - Discovery: defaultDiscovery, - Timeout: defaultTimeout, + Discovery: defaultDiscovery, + Timeout: defaultTimeout, }, } @@ -94,3 +92,31 @@ func TestEvaluateTransaction(t *testing.T) { t.Fatalf("Incorrect transaction result: %s", result) } } + +func TestContractEvent(t *testing.T) { + c := mockChannelProvider("mychannel") + + gw := &Gateway{ + options: &gatewayOptions{ + Discovery: defaultDiscovery, + Timeout: defaultTimeout, + }, + } + + nw, err := newNetwork(gw, c) + + if err != nil { + t.Fatalf("Failed to create network: %s", err) + } + + contr := nw.GetContract("contract1") + + eventID := "test([a-zA-Z]+)" + + reg, _, err := contr.RegisterEvent(eventID) + if err != nil { + t.Fatalf("Failed to register contract event: %s", err) + } + defer contr.Unregister(reg) + +} diff --git a/pkg/gateway/defaultcommithandlers.go b/pkg/gateway/defaultcommithandlers.go deleted file mode 100644 index c60170221e..0000000000 --- a/pkg/gateway/defaultcommithandlers.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Copyright 2020 IBM All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package gateway - -type list struct { - None CommitHandlerFactory - OrgAll CommitHandlerFactory - OrgAny CommitHandlerFactory - NetworkAll CommitHandlerFactory - NetworkAny CommitHandlerFactory -} - -// DefaultCommitHandlers provides the built-in commit handler implementations. -var DefaultCommitHandlers = &list{ - None: nil, - OrgAll: orgAll, - OrgAny: orgAny, - NetworkAll: networkAll, - NetworkAny: networkAny, -} - -type commithandler struct { - transactionID string - network Network -} - -func (ch *commithandler) StartListening() { -} - -func (ch *commithandler) WaitForEvents(timeout int64) { -} - -func (ch *commithandler) CancelListening() { -} - -type commithandlerfactory struct { -} - -func (chf *commithandlerfactory) Create(txid string, network Network) CommitHandler { - return &commithandler{ - transactionID: txid, - network: network, - } -} - -var orgAll = &commithandlerfactory{} -var orgAny = &commithandlerfactory{} -var networkAll = &commithandlerfactory{} -var networkAny = &commithandlerfactory{} diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 990e03535f..904ecb1ad1 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -32,11 +32,10 @@ type Gateway struct { } type gatewayOptions struct { - Identity mspProvider.SigningIdentity - User string - CommitHandler CommitHandlerFactory - Discovery bool - Timeout time.Duration + Identity mspProvider.SigningIdentity + User string + Discovery bool + Timeout time.Duration } // Option functional arguments can be supplied when connecting to the gateway. @@ -54,9 +53,8 @@ func Connect(config ConfigOption, identity IdentityOption, options ...Option) (* g := &Gateway{ options: &gatewayOptions{ - CommitHandler: DefaultCommitHandlers.OrgAll, - Discovery: defaultDiscovery, - Timeout: defaultTimeout, + Discovery: defaultDiscovery, + Timeout: defaultTimeout, }, } @@ -173,17 +171,6 @@ func WithUser(user string) IdentityOption { } } -// WithCommitHandler is an optional argument to the Connect method which -// allows an alternative commit handler to be specified. The commit handler defines how -// client code should wait to receive commit events from peers following submit of a transaction. -// Currently unimplemented. -func WithCommitHandler(handler CommitHandlerFactory) Option { - return func(gw *Gateway) error { - gw.options.CommitHandler = handler - return nil - } -} - // WithDiscovery is an optional argument to the Connect method which // enables or disables service discovery for all transaction submissions for this gateway. func WithDiscovery(discovery bool) Option { diff --git a/pkg/gateway/gateway_test.go b/pkg/gateway/gateway_test.go index 47159315b5..4d6c4202ad 100644 --- a/pkg/gateway/gateway_test.go +++ b/pkg/gateway/gateway_test.go @@ -65,10 +65,6 @@ func TestConnectNoOptions(t *testing.T) { options := gw.options - if options.CommitHandler != DefaultCommitHandlers.OrgAll { - t.Fatal("DefaultCommitHandler not correctly initialized") - } - if options.Discovery != true { t.Fatal("Discovery not correctly initialized") } @@ -96,10 +92,6 @@ func TestConnectWithSDK(t *testing.T) { options := gw.options - if options.CommitHandler != DefaultCommitHandlers.OrgAll { - t.Fatal("DefaultCommitHandler not correctly initialized") - } - if options.Discovery != true { t.Fatal("Discovery not correctly initialized") } @@ -133,23 +125,6 @@ func TestConnectWithIdentity(t *testing.T) { } } -func TestConnectWithCommitHandler(t *testing.T) { - gw, err := Connect( - WithConfig(config.FromFile("testdata/connection-tls.json")), - WithUser("user1"), - WithCommitHandler(DefaultCommitHandlers.OrgAny), - ) - if err != nil { - t.Fatalf("Failed to create gateway: %s", err) - } - - options := gw.options - - if options.CommitHandler != DefaultCommitHandlers.OrgAny { - t.Fatal("CommitHandler not set correctly") - } -} - func TestConnectWithDiscovery(t *testing.T) { gw, err := Connect( WithConfig(config.FromFile("testdata/connection-tls.json")), @@ -184,28 +159,6 @@ func TestConnectWithTimout(t *testing.T) { } } -func TestConnectWithMultipleOptions(t *testing.T) { - gw, err := Connect( - WithConfig(config.FromFile("testdata/connection-tls.json")), - WithUser("user1"), - WithCommitHandler(DefaultCommitHandlers.OrgAny), - WithDiscovery(false), - ) - if err != nil { - t.Fatalf("Failed to create gateway: %s", err) - } - - options := gw.options - - if options.Discovery != false { - t.Fatal("Discovery not set correctly") - } - - if options.CommitHandler != DefaultCommitHandlers.OrgAny { - t.Fatal("CommitHandler not set correctly") - } -} - func TestGetSDK(t *testing.T) { gw, err := Connect( WithConfig(config.FromFile("testdata/connection-tls.json")), diff --git a/pkg/gateway/network.go b/pkg/gateway/network.go index 85a69fabbc..182a48fe1e 100644 --- a/pkg/gateway/network.go +++ b/pkg/gateway/network.go @@ -8,6 +8,7 @@ package gateway import ( "github.com/hyperledger/fabric-sdk-go/pkg/client/channel" + "github.com/hyperledger/fabric-sdk-go/pkg/client/event" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" "github.com/pkg/errors" @@ -20,6 +21,7 @@ type Network struct { gateway *Gateway client *channel.Client peers []fab.Peer + event *event.Client } func newNetwork(gateway *Gateway, channelProvider context.ChannelProvider) (*Network, error) { @@ -54,6 +56,11 @@ func newNetwork(gateway *Gateway, channelProvider context.ChannelProvider) (*Net n.peers = peers + n.event, err = event.New(channelProvider, event.WithBlockEvents()) + if err != nil { + return nil, errors.Wrap(err, "Failed to create new event client") + } + return &n, nil } @@ -66,3 +73,24 @@ func (n *Network) Name() string { func (n *Network) GetContract(chaincodeID string) *Contract { return newContract(n, chaincodeID, "") } + +// RegisterBlockEvent registers for block events. Unregister must be called when the registration is no longer needed. +// Returns: +// the registration and a channel that is used to receive events. The channel is closed when Unregister is called. +func (n *Network) RegisterBlockEvent() (fab.Registration, <-chan *fab.BlockEvent, error) { + return n.event.RegisterBlockEvent() +} + +// RegisterFilteredBlockEvent registers for filtered block events. Unregister must be called when the registration is no longer needed. +// Returns: +// the registration and a channel that is used to receive events. The channel is closed when Unregister is called. +func (n *Network) RegisterFilteredBlockEvent() (fab.Registration, <-chan *fab.FilteredBlockEvent, error) { + return n.event.RegisterFilteredBlockEvent() +} + +// Unregister removes the given registration and closes the event channel. +// Parameters: +// registration is the registration handle that was returned from RegisterBlockEvent method +func (n *Network) Unregister(registration fab.Registration) { + n.event.Unregister(registration) +} diff --git a/pkg/gateway/network_test.go b/pkg/gateway/network_test.go index fe2f745d0b..02c9e02005 100644 --- a/pkg/gateway/network_test.go +++ b/pkg/gateway/network_test.go @@ -48,6 +48,56 @@ func TestGetContract(t *testing.T) { } } +func TestBlockEvent(t *testing.T) { + + gw := &Gateway{ + options: &gatewayOptions{ + Discovery: defaultDiscovery, + Timeout: defaultTimeout, + }, + } + + c := mockChannelProvider("mychannel") + + nw, err := newNetwork(gw, c) + + if err != nil { + t.Fatalf("Failed to create network: %s", err) + } + + reg, _, err := nw.RegisterBlockEvent() + if err != nil { + t.Fatalf("Failed to register block event: %s", err) + } + + nw.Unregister(reg) +} + +func TestFilteredBlocktEvent(t *testing.T) { + + gw := &Gateway{ + options: &gatewayOptions{ + Discovery: defaultDiscovery, + Timeout: defaultTimeout, + }, + } + + c := mockChannelProvider("mychannel") + + nw, err := newNetwork(gw, c) + + if err != nil { + t.Fatalf("Failed to create network: %s", err) + } + + reg, _, err := nw.RegisterFilteredBlockEvent() + if err != nil { + t.Fatalf("Failed to register filtered block event: %s", err) + } + + nw.Unregister(reg) +} + func mockChannelProvider(channelID string) context.ChannelProvider { channelProvider := func() (context.Channel, error) { diff --git a/pkg/gateway/spi.go b/pkg/gateway/spi.go index 4d0d521daa..6779d78be9 100644 --- a/pkg/gateway/spi.go +++ b/pkg/gateway/spi.go @@ -10,18 +10,6 @@ package gateway // for implementing alternative gateway strategies, wallets, etc. // This is currently experimental and will be implemented in future user stories -// CommitHandlerFactory is currently unimplemented -type CommitHandlerFactory interface { - Create(string, Network) CommitHandler -} - -// CommitHandler is currently unimplemented -type CommitHandler interface { - StartListening() - WaitForEvents(int64) - CancelListening() -} - // WalletStore is the interface for implementations that provide backing storage for identities in a wallet. // To create create a new backing store, implement all the methods defined in this interface and provide // a factory method that wraps an instance of this in a new Wallet object. E.g: diff --git a/pkg/gateway/transaction.go b/pkg/gateway/transaction.go index cb3ce25d87..1ce45f6bb7 100644 --- a/pkg/gateway/transaction.go +++ b/pkg/gateway/transaction.go @@ -7,7 +7,11 @@ SPDX-License-Identifier: Apache-2.0 package gateway import ( + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric-sdk-go/pkg/client/channel" + "github.com/hyperledger/fabric-sdk-go/pkg/client/channel/invoke" + "github.com/hyperledger/fabric-sdk-go/pkg/common/errors/status" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" "github.com/pkg/errors" ) @@ -24,6 +28,7 @@ type Transaction struct { contract *Contract request *channel.Request endorsingPeers []string + eventch chan *fab.TxStatusEvent } // TransactionOption functional arguments can be supplied when creating a transaction object @@ -107,7 +112,8 @@ func (txn *Transaction) Submit(args ...string) ([]byte, error) { } options = append(options, channel.WithTimeout(fab.Execute, txn.contract.network.gateway.options.Timeout)) - response, err := txn.contract.client.Execute( + response, err := txn.contract.client.InvokeHandler( + newSubmitHandler(txn.eventch), *txn.request, options..., ) @@ -117,3 +123,81 @@ func (txn *Transaction) Submit(args ...string) ([]byte, error) { return response.Payload, nil } + +// RegisterCommitEvent registers for a commit event for this transaction. +// Returns: +// the channel that is used to receive the event. The channel is closed after the event is queued. +func (txn *Transaction) RegisterCommitEvent() <-chan *fab.TxStatusEvent { + txn.eventch = make(chan *fab.TxStatusEvent, 1) + return txn.eventch +} + +func newSubmitHandler(eventch chan *fab.TxStatusEvent) invoke.Handler { + return invoke.NewSelectAndEndorseHandler( + invoke.NewEndorsementValidationHandler( + invoke.NewSignatureValidationHandler(&commitTxHandler{eventch}), + ), + ) +} + +type commitTxHandler struct { + eventch chan *fab.TxStatusEvent +} + +//Handle handles commit tx +func (c *commitTxHandler) Handle(requestContext *invoke.RequestContext, clientContext *invoke.ClientContext) { + txnID := requestContext.Response.TransactionID + + //Register Tx event + reg, statusNotifier, err := clientContext.EventService.RegisterTxStatusEvent(string(txnID)) // TODO: Change func to use TransactionID instead of string + if err != nil { + requestContext.Error = errors.Wrap(err, "error registering for TxStatus event") + return + } + defer clientContext.EventService.Unregister(reg) + _, err = createAndSendTransaction(clientContext.Transactor, requestContext.Response.Proposal, requestContext.Response.Responses) + if err != nil { + requestContext.Error = errors.Wrap(err, "CreateAndSendTransaction failed") + return + } + + select { + case txStatus := <-statusNotifier: + if c.eventch != nil { + c.eventch <- txStatus + close(c.eventch) + } + requestContext.Response.TxValidationCode = txStatus.TxValidationCode + + if txStatus.TxValidationCode != peer.TxValidationCode_VALID { + requestContext.Error = status.New(status.EventServerStatus, int32(txStatus.TxValidationCode), + "received invalid transaction", nil) + return + } + case <-requestContext.Ctx.Done(): + requestContext.Error = status.New(status.ClientStatus, status.Timeout.ToInt32(), + "Execute didn't receive block event", nil) + return + } +} + +func createAndSendTransaction(sender fab.Sender, proposal *fab.TransactionProposal, resps []*fab.TransactionProposalResponse) (*fab.TransactionResponse, error) { + + txnRequest := fab.TransactionRequest{ + Proposal: proposal, + ProposalResponses: resps, + } + + tx, err := sender.CreateTransaction(txnRequest) + if err != nil { + return nil, errors.WithMessage(err, "CreateTransaction failed") + } + + transactionResponse, err := sender.SendTransaction(tx) + if err != nil { + return nil, errors.WithMessage(err, "SendTransaction failed") + + } + + return transactionResponse, nil +} diff --git a/pkg/gateway/transaction_test.go b/pkg/gateway/transaction_test.go index ebb2368b05..84aeafcb70 100644 --- a/pkg/gateway/transaction_test.go +++ b/pkg/gateway/transaction_test.go @@ -7,16 +7,41 @@ SPDX-License-Identifier: Apache-2.0 package gateway import ( + reqContext "context" + "errors" + "net/http" + "strings" "testing" + "time" + + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric-sdk-go/pkg/client/channel/invoke" + txnmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks" + cpc "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + fcmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp" +) + +const ( + testTimeOut = 20 * time.Second + createAndSendError = "CreateAndSendTransaction failed" + txError = "MVCC_READ_CONFLICT" + mockError = "Mock fail" ) func TestTransactionOptions(t *testing.T) { transient := make(map[string][]byte) transient["price"] = []byte("8500") - + c := mockChannelProvider("mychannel") - gw := &Gateway{} + gw := &Gateway{ + options: &gatewayOptions{ + Discovery: defaultDiscovery, + Timeout: defaultTimeout, + }, + } nw, err := newNetwork(gw, c) @@ -27,7 +52,7 @@ func TestTransactionOptions(t *testing.T) { contr := nw.GetContract("contract1") txn, err := contr.CreateTransaction( - "txn1", + "txn1", WithTransient(transient), WithEndorsingPeers("peer1"), ) @@ -45,4 +70,215 @@ func TestTransactionOptions(t *testing.T) { if endorsers[0] != "peer1" { t.Fatalf("Incorrect endorsing peer: %s", endorsers[0]) } + + txn.Submit("arg1", "arg2") +} + +func TestCommitEvent(t *testing.T) { + c := mockChannelProvider("mychannel") + + gw := &Gateway{ + options: &gatewayOptions{ + Discovery: defaultDiscovery, + Timeout: defaultTimeout, + }, + } + + nw, err := newNetwork(gw, c) + + if err != nil { + t.Fatalf("Failed to create network: %s", err) + } + + contr := nw.GetContract("contract1") + txn, err := contr.CreateTransaction("txn1") + notifier := txn.RegisterCommitEvent() + + result, err := txn.Submit("arg1", "arg2") + + if err != nil { + t.Fatalf("Failed to submit transaction: %s", err) + } + + if string(result) != "abc" { + t.Fatalf("Incorrect transaction result: %s", result) + } + + var cEvent *fab.TxStatusEvent + select { + case cEvent = <-notifier: + t.Logf("Received commit event: %#v\n", cEvent) + case <-time.After(time.Second * 20): + t.Fatal("Did NOT receive commit event\n") + } + +} + +func TestSubmitHandlerTxCreateError(t *testing.T) { + + //Sample request + request := invoke.Request{ChaincodeID: "test", Fcn: "invoke", Args: [][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("1")}} + + //Prepare context objects for handler + requestContext := prepareRequestContext(request, invoke.Opts{}, t) + + mockPeer1 := &fcmocks.MockPeer{MockName: "Peer1", MockURL: "http://peer1.com", MockRoles: []string{}, MockCert: nil, MockMSP: "Org1MSP", + Status: 200, Payload: []byte("value")} + mockPeer2 := &fcmocks.MockPeer{MockName: "Peer2", MockURL: "http://peer2.com", MockRoles: []string{}, MockCert: nil, MockMSP: "Org1MSP", + Status: 200, Payload: []byte("value1")} + + clientContext := setupChannelClientContext(nil, nil, []fab.Peer{mockPeer1, mockPeer2}, t) + + //Get commit handler + commitHandler := &commitTxHandler{} + //Perform action through handler + commitHandler.Handle(requestContext, clientContext) + if requestContext.Error == nil || !strings.Contains(requestContext.Error.Error(), createAndSendError) { + t.Fatal("Expected error: ", createAndSendError, ", Received error:", requestContext.Error.Error()) + } +} + +func TestSubmitHandlerTxSendError(t *testing.T) { + + //Sample request + request := invoke.Request{ChaincodeID: "test", Fcn: "invoke", Args: [][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("1")}} + + //Prepare context objects for handler + requestContext := prepareRequestContext(request, invoke.Opts{}, t) + + mockPeer1 := &fcmocks.MockPeer{MockName: "Peer1", MockURL: "http://peer1.com", MockRoles: []string{}, MockCert: nil, MockMSP: "Org1MSP", + Status: 200, Payload: []byte("value")} + mockPeer2 := &fcmocks.MockPeer{MockName: "Peer2", MockURL: "http://peer2.com", MockRoles: []string{}, MockCert: nil, MockMSP: "Org1MSP", + Status: 200, Payload: []byte("value1")} + + clientContext := setupChannelClientContext(nil, nil, []fab.Peer{mockPeer1, mockPeer2}, t) + clientContext.Transactor = &mockTransactor{} + + //Get commit handler + commitHandler := &commitTxHandler{} + //Perform action through handler + commitHandler.Handle(requestContext, clientContext) + if requestContext.Error == nil || !strings.Contains(requestContext.Error.Error(), mockError) { + t.Fatal("Expected error: ", mockError, ", Received error:", requestContext.Error.Error()) + } +} + +func TestSubmitHandlerCommitError(t *testing.T) { + + //Sample request + request := invoke.Request{ChaincodeID: "test", Fcn: "invoke", Args: [][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("1")}} + + //Prepare context objects for handler + requestContext := prepareRequestContext(request, invoke.Opts{}, t) + + mockPeer1 := &fcmocks.MockPeer{MockName: "Peer1", MockURL: "http://peer1.com", MockRoles: []string{}, MockCert: nil, MockMSP: "Org1MSP", + Status: 200, Payload: []byte("value")} + mockPeer2 := &fcmocks.MockPeer{MockName: "Peer2", MockURL: "http://peer2.com", MockRoles: []string{}, MockCert: nil, MockMSP: "Org1MSP", + Status: 200, Payload: []byte("value1")} + + clientContext := setupChannelClientContext(nil, nil, []fab.Peer{mockPeer1, mockPeer2}, t) + + // add reponses to request context + addProposalResponse(requestContext) + clientContext.EventService.(*fcmocks.MockEventService).TxValidationCode = peer.TxValidationCode_MVCC_READ_CONFLICT + + //Get commit handler + commitHandler := &commitTxHandler{} + //Perform action through handler + commitHandler.Handle(requestContext, clientContext) + if requestContext.Error == nil { + t.Fatal("Expected error, got none") + } + if !strings.Contains(requestContext.Error.Error(), txError) { + t.Fatal("Expected error: ", txError, ", Received error:", requestContext.Error.Error()) + } + +} + +//prepareHandlerContexts prepares context objects for handlers +func prepareRequestContext(request invoke.Request, opts invoke.Opts, t *testing.T) *invoke.RequestContext { + requestContext := &invoke.RequestContext{Request: request, + Opts: opts, + Response: invoke.Response{}, + Ctx: reqContext.Background(), + } + + requestContext.Opts.Timeouts = make(map[fab.TimeoutType]time.Duration) + requestContext.Opts.Timeouts[fab.Execute] = testTimeOut + if opts.TargetFilter != nil { + requestContext.SelectionFilter = func(peer fab.Peer) bool { + return opts.TargetFilter.Accept(peer) + } + } + if opts.TargetSorter != nil { + requestContext.PeerSorter = func(peers []fab.Peer) []fab.Peer { + return opts.TargetSorter.Sort(peers) + } + } + + return requestContext +} + +func setupChannelClientContext(discErr error, selectionErr error, peers []fab.Peer, t *testing.T) *invoke.ClientContext { + membership := fcmocks.NewMockMembership() + + ctx := setupTestContext() + orderer := fcmocks.NewMockOrderer("", nil) + transactor := txnmocks.MockTransactor{ + Ctx: ctx, + ChannelID: "testChannel", + Orderers: []fab.Orderer{orderer}, + } + + return &invoke.ClientContext{ + Membership: membership, + Discovery: txnmocks.NewMockDiscoveryService(discErr), + Selection: txnmocks.NewMockSelectionService(selectionErr, peers...), + Transactor: &transactor, + EventService: fcmocks.NewMockEventService(), + } + +} + +func setupTestContext() cpc.Client { + user := mspmocks.NewMockSigningIdentity("test", "test") + ctx := fcmocks.NewMockContext(user) + return ctx +} + +func addProposalResponse(request *invoke.RequestContext) { + r1 := &fab.TransactionProposalResponse{ + Endorser: "peer 1", + Status: http.StatusOK, + ProposalResponse: &peer.ProposalResponse{Response: &peer.Response{ + Message: "test", Status: http.StatusOK, Payload: []byte("ResponsePayload")}, + Payload: []byte("ProposalPayload1"), + Endorsement: &peer.Endorsement{}, + }} + p := &fab.TransactionProposal{ + Proposal: &peer.Proposal{}, + } + request.Response = invoke.Response{ + Proposal: p, + Responses: []*fab.TransactionProposalResponse{r1}, + TxValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT, + } +} + +type mockTransactor struct{} + +func (t *mockTransactor) CreateTransaction(request fab.TransactionRequest) (*fab.Transaction, error) { + return nil, nil +} + +func (t *mockTransactor) SendTransaction(tx *fab.Transaction) (*fab.TransactionResponse, error) { + return nil, errors.New("Mock fail") +} + +func (t *mockTransactor) CreateTransactionHeader(opts ...fab.TxnHeaderOpt) (fab.TransactionHeader, error) { + return nil, nil +} + +func (t *mockTransactor) SendTransactionProposal(proposal *fab.TransactionProposal, targets []fab.ProposalProcessor) ([]*fab.TransactionProposalResponse, error) { + return nil, nil } diff --git a/pkg/gateway/wallet_test.go b/pkg/gateway/wallet_test.go index ad7e520678..87c5c168e0 100644 --- a/pkg/gateway/wallet_test.go +++ b/pkg/gateway/wallet_test.go @@ -10,6 +10,8 @@ import ( "reflect" "sort" "testing" + + "github.com/pkg/errors" ) type walletGenerator = func() (*Wallet, error) @@ -26,6 +28,7 @@ func testWalletSuite(t *testing.T, gen walletGenerator) { {"testContentsOfWallet", testContentsOfWallet}, {"testRemovalFromWallet", testRemovalFromWallet}, {"testRemoveNonExist", testRemoveNonExist}, + {"testPutInvalidID", testPutInvalidID}, } for _, test := range tests { t.Run(test.title, func(t *testing.T) { @@ -106,3 +109,58 @@ func testRemoveNonExist(t *testing.T, wallet *Wallet) { t.Fatal("Remove should not throw error for non-existant label") } } + +func testPutInvalidID(t *testing.T, wallet *Wallet) { + err := wallet.Put("label4", &badIdentity{}) + if err == nil { + t.Fatal("Put should throw error for bad identity") + } +} + +func TestGetFromCorruptWallet(t *testing.T) { + wallet := &Wallet{&corruptWallet{}} + _, err := wallet.Get("user") + if err == nil { + t.Fatalf("Get should throw error for corrupt entry") + } +} + +type badIdentity struct{} + +func (id *badIdentity) idType() string { + return "bad" +} + +func (id *badIdentity) mspID() string { + return "mspid" +} + +func (id *badIdentity) toJSON() ([]byte, error) { + return nil, errors.New("toJSON error") +} + +func (id *badIdentity) fromJSON(data []byte) (Identity, error) { + return nil, errors.New("fromJSON error") +} + +type corruptWallet struct{} + +func (cw *corruptWallet) Put(label string, stream []byte) error { + return nil +} + +func (cw *corruptWallet) Get(label string) ([]byte, error) { + return []byte("{\"type\":\"X.509\",\"credentials\":\"corrupt\"}"), nil +} + +func (cw *corruptWallet) List() ([]string, error) { + return nil, nil +} + +func (cw *corruptWallet) Exists(label string) bool { + return false +} + +func (cw *corruptWallet) Remove(label string) error { + return nil +} diff --git a/test/integration/pkg/gateway/gateway.go b/test/integration/pkg/gateway/gateway.go index 8976eaedd6..0695b4a3d5 100644 --- a/test/integration/pkg/gateway/gateway.go +++ b/test/integration/pkg/gateway/gateway.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" "github.com/hyperledger/fabric-sdk-go/pkg/core/config" "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk" "github.com/hyperledger/fabric-sdk-go/pkg/gateway" @@ -120,6 +121,73 @@ func RunWithTransient(t *testing.T) { testTransientData(contract, t) } +// RunWithBlockEvent tests receiving block events +func RunWithBlockEvent(t *testing.T) { + configPath := integration.GetConfigPath("config_e2e.yaml") + + gw, err := gateway.Connect( + gateway.WithConfig(config.FromFile(configPath)), + gateway.WithUser("User1"), + ) + if err != nil { + t.Fatalf("Failed to create new Gateway: %s", err) + } + defer gw.Close() + + nw, err := gw.GetNetwork(channelID) + if err != nil { + t.Fatalf("Failed to get network: %s", err) + } + + _, notifier1, err := nw.RegisterBlockEvent() + if err != nil { + t.Fatalf("Failed to register block event: %s", err) + } + + _, notifier2, err := nw.RegisterFilteredBlockEvent() + if err != nil { + t.Fatalf("Failed to register filtered block event: %s", err) + } + + contract := nw.GetContract(ccID) + runContract(contract, t) + + var bEvent *fab.BlockEvent + var fEvent *fab.FilteredBlockEvent + for i := 0; i < 2; i++ { + select { + case bEvent = <-notifier1: + t.Logf("Received block event: %#v\n", bEvent) + case fEvent = <-notifier2: + t.Logf("Received filtered block event: %#v\n", fEvent) + case <-time.After(time.Second * 20): + t.Fatal("Did NOT receive block event\n") + } + } +} + +// RunWithContractEvent tests receiving contract events +func RunWithContractEvent(t *testing.T) { + configPath := integration.GetConfigPath("config_e2e.yaml") + + gw, err := gateway.Connect( + gateway.WithConfig(config.FromFile(configPath)), + gateway.WithUser("User1"), + ) + if err != nil { + t.Fatalf("Failed to create new Gateway: %s", err) + } + defer gw.Close() + + nw, err := gw.GetNetwork(channelID) + if err != nil { + t.Fatalf("Failed to get network: %s", err) + } + + contract := nw.GetContract(ccID) + testContractEvent(contract, t) +} + func testGateway(gw *gateway.Gateway, t *testing.T) { nw, err := gw.GetNetwork(channelID) if err != nil { @@ -180,6 +248,8 @@ func testTransientData(contract *gateway.Contract, t *testing.T) { t.Fatalf("Failed to create transaction: %s", err) } + notifier := txn.RegisterCommitEvent() + result, err := txn.Submit("move", "a", "b", "1") if err != nil { t.Fatalf("Failed to submit transaction: %s", err) @@ -188,6 +258,43 @@ func testTransientData(contract *gateway.Contract, t *testing.T) { if string(result) != "8500" { t.Fatalf("Incorrect result: %s", string(result)) } + + var cEvent *fab.TxStatusEvent + select { + case cEvent = <-notifier: + t.Logf("Received commit event: %#v\n", cEvent) + case <-time.After(time.Second * 20): + t.Fatal("Did NOT receive commit event\n") + } + +} + +func testContractEvent(contract *gateway.Contract, t *testing.T) { + eventID := "test([a-zA-Z]+)" + + reg, notifier, err := contract.RegisterEvent(eventID) + if err != nil { + t.Fatalf("Failed to register contract event: %s", err) + } + defer contract.Unregister(reg) + + _, err = contract.SubmitTransaction("invoke", "move", "a", "b", "1") + if err != nil { + t.Fatalf("Failed to submit transaction: %s", err) + } + + var ccEvent *fab.CCEvent + select { + case ccEvent = <-notifier: + t.Logf("Received CC event: %#v\n", ccEvent) + payload := string(ccEvent.Payload) + if payload != "Test Payload" { + t.Fatalf("Received incorrect event payload: %s", payload) + } + case <-time.After(time.Second * 20): + t.Fatalf("Did NOT receive CC event for eventId(%s)\n", eventID) + } + } func populateWallet(wallet *gateway.Wallet) error { diff --git a/test/integration/pkg/gateway/gateway_test.go b/test/integration/pkg/gateway/gateway_test.go index a3f2258b96..782a2c65dd 100644 --- a/test/integration/pkg/gateway/gateway_test.go +++ b/test/integration/pkg/gateway/gateway_test.go @@ -33,3 +33,15 @@ func TestTransientData(t *testing.T) { RunWithTransient(t) }) } + +func TestContractEvent(t *testing.T) { + t.Run("Base", func(t *testing.T) { + RunWithContractEvent(t) + }) +} + +func TestBlockEvent(t *testing.T) { + t.Run("Base", func(t *testing.T) { + RunWithBlockEvent(t) + }) +}