diff --git a/MAINTAINERS.md b/MAINTAINERS.md index ff56e9e5df..4cd858ea72 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -8,7 +8,6 @@ | Aleksandar Likic | alikic | aleksandar.likic@securekey.com | | Bob Stasyszyn | bstasyszyn | bob.stasyszyn@securekey.com | | Firas.Qutishat | fqutishat | firas.qutishat@securekey.com | -| Gari Singh | mastersingh24 | gari.r.singh@gmail.com | | Troy Ronda | troyronda | troy@troyronda.com | ### Retired Maintainers @@ -16,5 +15,6 @@ | Name | GitHub | email | |---|---|---| | Jim Zhang | jimthematrix | jim_the_matrix@hotmail.com | +| Gari Singh | mastersingh24 | gari.r.singh@gmail.com | Creative Commons License
This work is licensed under a Creative Commons Attribution 4.0 International License diff --git a/pkg/client/channel/chclientrun_std.go b/pkg/client/channel/chclientrun_std.go index 3f2a459a44..7ae0b32f2e 100644 --- a/pkg/client/channel/chclientrun_std.go +++ b/pkg/client/channel/chclientrun_std.go @@ -1,3 +1,4 @@ +//go:build !pprof // +build !pprof /* diff --git a/pkg/client/event/event.go b/pkg/client/event/event.go index 0062d99206..bf7370a0b1 100644 --- a/pkg/client/event/event.go +++ b/pkg/client/event/event.go @@ -33,11 +33,13 @@ type Client struct { permitBlockEvents bool fromBlock uint64 seekType seek.Type + chaincodeID string eventConsumerTimeout *time.Duration } // New returns a Client instance. Client receives events such as block, filtered block, // chaincode, and transaction status events. +// nolint: gocyclo func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client, error) { channelContext, err := channelProvider() @@ -68,6 +70,9 @@ func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client opts = append(opts, deliverclient.WithBlockNum(eventClient.fromBlock)) } } + if eventClient.chaincodeID != "" { + opts = append(opts, deliverclient.WithChaincodeID(eventClient.chaincodeID)) + } if eventClient.eventConsumerTimeout != nil { opts = append(opts, dispatcher.WithEventConsumerTimeout(*eventClient.eventConsumerTimeout)) } diff --git a/pkg/client/event/event_test.go b/pkg/client/event/event_test.go index d45ce5d1c4..1e34b5b728 100644 --- a/pkg/client/event/event_test.go +++ b/pkg/client/event/event_test.go @@ -20,11 +20,11 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" + pb "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" - pb "github.com/hyperledger/fabric-protos-go/peer" ) var ( @@ -43,7 +43,7 @@ func TestNewEventClient(t *testing.T) { t.Fatalf("Failed to create new event client: %s", err) } - _, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64), WithEventConsumerTimeout(500 * time.Millisecond)) + _, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64), WithEventConsumerTimeout(500*time.Millisecond), WithChaincodeID("testChaincode")) if err != nil { t.Fatalf("Failed to create new event client: %s", err) } @@ -55,6 +55,22 @@ func TestNewEventClient(t *testing.T) { } } +func TestNewEventClientWithFromBlock(t *testing.T) { + + fabCtx := setupCustomTestContext(t, nil) + ctx := createChannelContext(fabCtx, channelID) + + _, err := New(ctx) + if err != nil { + t.Fatalf("Failed to create new event client: %s", err) + } + + _, err = New(ctx, WithBlockEvents(), WithSeekType(seek.FromBlock), WithBlockNum(100), WithChaincodeID("testChaincode")) + if err != nil { + t.Fatalf("Failed to create new event client: %s", err) + } +} + func TestBlockEvents(t *testing.T) { eventService, eventProducer, err := newServiceWithMockProducer(defaultOpts, withBlockLedger(sourceURL)) diff --git a/pkg/client/event/example_test.go b/pkg/client/event/example_test.go index 098b0464f4..6990c01a0f 100644 --- a/pkg/client/event/example_test.go +++ b/pkg/client/event/example_test.go @@ -77,6 +77,25 @@ func ExampleClient_RegisterChaincodeEvent() { } +func ExampleClient_RegisterChaincodeEvent_NewService() { + + ec, err := New(mockChannelProvider("mychannel"), WithChaincodeID("examplecc")) + if err != nil { + fmt.Println("failed to create client") + } + + registration, _, err := ec.RegisterChaincodeEvent("examplecc", "event123") + if err != nil { + fmt.Println("failed to register chaincode event") + } + defer ec.Unregister(registration) + + fmt.Println("chaincode event registered successfully") + + // Output: chaincode event registered successfully + +} + func ExampleClient_RegisterChaincodeEvent_withPayload() { // If you require payload for chaincode events you have to use WithBlockEvents() option diff --git a/pkg/client/event/opts.go b/pkg/client/event/opts.go index 5803fabc56..28d95e4720 100644 --- a/pkg/client/event/opts.go +++ b/pkg/client/event/opts.go @@ -42,6 +42,15 @@ func WithSeekType(seek seek.Type) ClientOption { } } +// WithChaincodeID indicates the target chaincode +// Only deliverclient supports this +func WithChaincodeID(id string) ClientOption { + return func(c *Client) error { + c.chaincodeID = id + return nil + } +} + // WithEventConsumerTimeout is the timeout when sending events to a registered consumer. // If < 0, if buffer full, unblocks immediately and does not send. // If 0, if buffer full, will block and guarantee the event will be sent out. diff --git a/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go b/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go index 3893b9577f..649e222fd6 100644 --- a/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go +++ b/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go @@ -169,4 +169,19 @@ func TestOpts(t *testing.T) { assert.Equal(t, 9, params.reconnectBlockHeightLagThreshold) }) + t.Run("No filter", func(t *testing.T) { + policy := fab.EventServicePolicy{ + BlockHeightLagThreshold: -1, + MinBlockHeightResolverMode: fab.ResolveByThreshold, + } + config.SetCustomChannelConfig(channelID, &fab.ChannelEndpointConfig{ + Policies: fab.ChannelPolicies{ + EventService: policy, + }, + }) + + params := defaultParams(context, channelID) + require.NotNil(t, params) + assert.Equal(t, -1, params.blockHeightLagThreshold) + }) } diff --git a/pkg/fab/events/client/peerresolver/minblockheight/opts.go b/pkg/fab/events/client/peerresolver/minblockheight/opts.go index 6de30a4fa9..84b637ff4b 100644 --- a/pkg/fab/events/client/peerresolver/minblockheight/opts.go +++ b/pkg/fab/events/client/peerresolver/minblockheight/opts.go @@ -104,10 +104,6 @@ func getBlockHeightLagThreshold(policy fab.EventServicePolicy) int { threshold = 0 case fab.ResolveByThreshold: threshold = policy.BlockHeightLagThreshold - if threshold <= 0 { - logger.Warnf("Invalid BlockHeightLagThreshold: %d. Using default: %d", threshold, defaultBlockHeightLagThreshold) - threshold = defaultBlockHeightLagThreshold - } default: logger.Warnf("Invalid MinBlockHeightResolverMode: [%s]. Using default: [%s]", policy.MinBlockHeightResolverMode, fab.ResolveByThreshold) threshold = policy.BlockHeightLagThreshold diff --git a/pkg/fab/events/deliverclient/deliverclient_test.go b/pkg/fab/events/deliverclient/deliverclient_test.go index d7f965fc1c..20259aee8f 100755 --- a/pkg/fab/events/deliverclient/deliverclient_test.go +++ b/pkg/fab/events/deliverclient/deliverclient_test.go @@ -72,6 +72,7 @@ func TestClientConnect(t *testing.T) { ), WithSeekType(seek.FromBlock), WithBlockNum(0), + WithChaincodeID("testChaincode"), client.WithResponseTimeout(3*time.Second), ) if err != nil { diff --git a/pkg/fab/events/deliverclient/opts.go b/pkg/fab/events/deliverclient/opts.go index f8c7e80565..634571fabc 100755 --- a/pkg/fab/events/deliverclient/opts.go +++ b/pkg/fab/events/deliverclient/opts.go @@ -19,6 +19,7 @@ type params struct { connProvider api.ConnectionProvider seekType seek.Type fromBlock uint64 + chaincodeID string respTimeout time.Duration } @@ -48,6 +49,15 @@ func WithBlockNum(value uint64) options.Opt { } } +// WithChaincodeID specifies the chaincode from which events are to be received. +func WithChaincodeID(value string) options.Opt { + return func(p options.Params) { + if setter, ok := p.(chaincodeIDSetter); ok { + setter.SetChaincodeID(value) + } + } +} + type seekTypeSetter interface { SetSeekType(value seek.Type) } @@ -56,6 +66,10 @@ type fromBlockSetter interface { SetFromBlock(value uint64) } +type chaincodeIDSetter interface { + SetChaincodeID(value string) +} + func (p *params) PermitBlockEvents() { logger.Debug("PermitBlockEvents") p.connProvider = deliverProvider @@ -79,6 +93,11 @@ func (p *params) SetSeekType(value seek.Type) { } } +func (p *params) SetChaincodeID(value string) { + logger.Debugf("ChaincodId: %d", value) + p.chaincodeID = value +} + func (p *params) SetResponseTimeout(value time.Duration) { logger.Debugf("ResponseTimeout: %s", value) p.respTimeout = value diff --git a/pkg/fabsdk/fabsdk_std.go b/pkg/fabsdk/fabsdk_std.go index 7a70221bb0..17edff4cb6 100644 --- a/pkg/fabsdk/fabsdk_std.go +++ b/pkg/fabsdk/fabsdk_std.go @@ -1,3 +1,4 @@ +//go:build !pprof // +build !pprof /* diff --git a/pkg/fabsdk/provider/chpvdr/cachekey.go b/pkg/fabsdk/provider/chpvdr/cachekey.go index 4326a4b4b4..b5a92d6de8 100644 --- a/pkg/fabsdk/provider/chpvdr/cachekey.go +++ b/pkg/fabsdk/provider/chpvdr/cachekey.go @@ -8,10 +8,11 @@ package chpvdr import ( "crypto/sha256" - "strconv" + "fmt" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek" ) // ctxtCacheKey is a lazy cache key for the context cache @@ -99,6 +100,9 @@ func (k *eventCacheKey) String() string { type params struct { permitBlockEvents bool + seekType seek.Type + fromBlock uint64 + chaincodeID string } func defaultParams() *params { @@ -109,8 +113,24 @@ func (p *params) PermitBlockEvents() { p.permitBlockEvents = true } +func (p *params) SetFromBlock(value uint64) { + p.fromBlock = value +} + +func (p *params) SetSeekType(value seek.Type) { + if value != "" { + p.seekType = value + } +} + +func (p *params) SetChaincodeID(value string) { + if value != "" { + p.chaincodeID = value + } +} + func (p *params) getOptKey() string { // Construct opts portion - optKey := "blockEvents:" + strconv.FormatBool(p.permitBlockEvents) + optKey := fmt.Sprintf("blockEvents:%t,seekType:%s,fromBlock:%d,chaincodeId:%s", p.permitBlockEvents, p.seekType, p.fromBlock, p.chaincodeID) return optKey } diff --git a/pkg/fabsdk/provider/chpvdr/chprovider_test.go b/pkg/fabsdk/provider/chpvdr/chprovider_test.go index 0fcaf5b88c..6a7fa89314 100644 --- a/pkg/fabsdk/provider/chpvdr/chprovider_test.go +++ b/pkg/fabsdk/provider/chpvdr/chprovider_test.go @@ -1,3 +1,4 @@ +//go:build testing // +build testing /* @@ -24,6 +25,8 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/fab/chconfig" "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery" discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient" "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp" "github.com/pkg/errors" @@ -83,7 +86,7 @@ func TestBasicValidChannel(t *testing.T) { assert.NotNil(t, channelConfig) assert.NotEmptyf(t, channelConfig.ID(), "Got empty channel ID from channel config") - eventService, err := channelService.EventService() + eventService, err := channelService.EventService(client.WithBlockEvents(), deliverclient.WithSeekType("from"), deliverclient.WithBlockNum(10), deliverclient.WithChaincodeID("testChaincode")) require.NoError(t, err) require.NotNil(t, eventService) diff --git a/pkg/gateway/filesystemwallet.go b/pkg/gateway/filesystemwallet.go index e0df9a9749..4e87b4e4a3 100644 --- a/pkg/gateway/filesystemwallet.go +++ b/pkg/gateway/filesystemwallet.go @@ -37,7 +37,7 @@ func NewFileSystemWallet(path string) (*Wallet, error) { } store := &fileSystemWalletStore{cleanPath} - return &Wallet{store}, nil + return NewWalletWithStore(store), nil } diff --git a/pkg/gateway/transaction.go b/pkg/gateway/transaction.go index 1836eba403..ce80cbe882 100644 --- a/pkg/gateway/transaction.go +++ b/pkg/gateway/transaction.go @@ -29,6 +29,7 @@ type Transaction struct { request *channel.Request endorsingPeers []string collections []string + isInit bool eventch chan *fab.TxStatusEvent } @@ -83,6 +84,15 @@ func WithCollections(collections ...string) TransactionOption { } } +// WithInit is an option that makes the transaction fulfill the --init-required condition before the chaincode +// can be invoked to process regular transactions +func WithInit() TransactionOption { + return func(txn *Transaction) error { + txn.isInit = true + return nil + } +} + // Evaluate a transaction function and return its results. // The transaction function will be evaluated on the endorsing peers but // the responses will not be sent to the ordering service and hence will @@ -124,6 +134,7 @@ func (txn *Transaction) Submit(args ...string) ([]byte, error) { bytes[i] = []byte(v) } txn.request.Args = bytes + txn.request.IsInit = txn.isInit var options []channel.RequestOption if txn.endorsingPeers != nil { diff --git a/pkg/gateway/transaction_test.go b/pkg/gateway/transaction_test.go index cb9f61673f..bc60e69234 100644 --- a/pkg/gateway/transaction_test.go +++ b/pkg/gateway/transaction_test.go @@ -80,6 +80,57 @@ func TestTransactionOptions(t *testing.T) { txn.Submit("arg1", "arg2") } +func TestInitTransactionOptions(t *testing.T) { + transient := make(map[string][]byte) + transient["price"] = []byte("8500") + + c := mockChannelProvider("mychannel") + + gw := &Gateway{ + options: &gatewayOptions{ + Timeout: defaultTimeout, + }, + } + + nw, err := newNetwork(gw, c) + + if err != nil { + t.Fatalf("Failed to create network: %s", err) + } + + contr := nw.GetContract("contract1") + + txn, err := contr.CreateTransaction( + "txn1", + WithTransient(transient), + WithEndorsingPeers("peer1"), + WithCollections("_implicit_org_org1"), + WithInit(), + ) + + if err != nil { + t.Fatalf("Failed to create transaction: %s", err) + } + + data := txn.request.TransientMap["price"] + if string(data) != "8500" { + t.Fatalf("Incorrect transient data: %s", string(data)) + } + + endorsers := txn.endorsingPeers + if endorsers[0] != "peer1" { + t.Fatalf("Incorrect endorsing peer: %s", endorsers[0]) + } + + collections := txn.collections + if collections[0] != "_implicit_org_org1" { + t.Fatalf("Incorrect collection: %s", collections[0]) + } + + txn.Evaluate("arg1", "arg2") + txn.Submit("arg1", "arg2") +} + func TestCommitEvent(t *testing.T) { c := mockChannelProvider("mychannel") diff --git a/pkg/gateway/wallet.go b/pkg/gateway/wallet.go index 2536e3e140..485d2b2c71 100644 --- a/pkg/gateway/wallet.go +++ b/pkg/gateway/wallet.go @@ -25,6 +25,16 @@ type wallet interface { List() ([]string, error) } +// NewWalletWithStore is used for creating wallets with custom walletstores as backends. +// Parameters: +// store is the WalletStore you want to use as the backend for the wallet. +// +// Returns: +// The Wallet object. +func NewWalletWithStore(store WalletStore) *Wallet { + return &Wallet{store: store} +} + // A Wallet stores identity information used to connect to a Hyperledger Fabric network. // Instances are created using factory methods on the implementing objects. type Wallet struct {