Skip to content

Commit

Permalink
Merge branch 'main' into setClosed
Browse files Browse the repository at this point in the history
  • Loading branch information
pfi79 authored Oct 18, 2022
2 parents 6586049 + 965abe3 commit fae0d70
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 11 deletions.
2 changes: 1 addition & 1 deletion MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
| Aleksandar Likic | alikic | [email protected] |
| Bob Stasyszyn | bstasyszyn | [email protected] |
| Firas.Qutishat | fqutishat | [email protected] |
| Gari Singh | mastersingh24 | [email protected] |
| Troy Ronda | troyronda | [email protected] |

### Retired Maintainers

| Name | GitHub | email |
|---|---|---|
| Jim Zhang | jimthematrix | [email protected] |
| Gari Singh | mastersingh24 | [email protected] |

<a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by/4.0/88x31.png" /></a><br />This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>
1 change: 1 addition & 0 deletions pkg/client/channel/chclientrun_std.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !pprof
// +build !pprof

/*
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type Client struct {
permitBlockEvents bool
fromBlock uint64
seekType seek.Type
chaincodeID string
eventConsumerTimeout *time.Duration
}

// New returns a Client instance. Client receives events such as block, filtered block,
// chaincode, and transaction status events.
// nolint: gocyclo
func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client, error) {

channelContext, err := channelProvider()
Expand Down Expand Up @@ -68,6 +70,9 @@ func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client
opts = append(opts, deliverclient.WithBlockNum(eventClient.fromBlock))
}
}
if eventClient.chaincodeID != "" {
opts = append(opts, deliverclient.WithChaincodeID(eventClient.chaincodeID))
}
if eventClient.eventConsumerTimeout != nil {
opts = append(opts, dispatcher.WithEventConsumerTimeout(*eventClient.eventConsumerTimeout))
}
Expand Down
20 changes: 18 additions & 2 deletions pkg/client/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher"
servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks"
pb "github.com/hyperledger/fabric-protos-go/peer"
)

var (
Expand All @@ -43,7 +43,7 @@ func TestNewEventClient(t *testing.T) {
t.Fatalf("Failed to create new event client: %s", err)
}

_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64), WithEventConsumerTimeout(500 * time.Millisecond))
_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64), WithEventConsumerTimeout(500*time.Millisecond), WithChaincodeID("testChaincode"))
if err != nil {
t.Fatalf("Failed to create new event client: %s", err)
}
Expand All @@ -55,6 +55,22 @@ func TestNewEventClient(t *testing.T) {
}
}

func TestNewEventClientWithFromBlock(t *testing.T) {

fabCtx := setupCustomTestContext(t, nil)
ctx := createChannelContext(fabCtx, channelID)

_, err := New(ctx)
if err != nil {
t.Fatalf("Failed to create new event client: %s", err)
}

_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.FromBlock), WithBlockNum(100), WithChaincodeID("testChaincode"))
if err != nil {
t.Fatalf("Failed to create new event client: %s", err)
}
}

func TestBlockEvents(t *testing.T) {

eventService, eventProducer, err := newServiceWithMockProducer(defaultOpts, withBlockLedger(sourceURL))
Expand Down
19 changes: 19 additions & 0 deletions pkg/client/event/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,25 @@ func ExampleClient_RegisterChaincodeEvent() {

}

func ExampleClient_RegisterChaincodeEvent_NewService() {

ec, err := New(mockChannelProvider("mychannel"), WithChaincodeID("examplecc"))
if err != nil {
fmt.Println("failed to create client")
}

registration, _, err := ec.RegisterChaincodeEvent("examplecc", "event123")
if err != nil {
fmt.Println("failed to register chaincode event")
}
defer ec.Unregister(registration)

fmt.Println("chaincode event registered successfully")

// Output: chaincode event registered successfully

}

func ExampleClient_RegisterChaincodeEvent_withPayload() {

// If you require payload for chaincode events you have to use WithBlockEvents() option
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/event/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ func WithSeekType(seek seek.Type) ClientOption {
}
}

// WithChaincodeID indicates the target chaincode
// Only deliverclient supports this
func WithChaincodeID(id string) ClientOption {
return func(c *Client) error {
c.chaincodeID = id
return nil
}
}

// WithEventConsumerTimeout is the timeout when sending events to a registered consumer.
// If < 0, if buffer full, unblocks immediately and does not send.
// If 0, if buffer full, will block and guarantee the event will be sent out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,19 @@ func TestOpts(t *testing.T) {
assert.Equal(t, 9, params.reconnectBlockHeightLagThreshold)
})

t.Run("No filter", func(t *testing.T) {
policy := fab.EventServicePolicy{
BlockHeightLagThreshold: -1,
MinBlockHeightResolverMode: fab.ResolveByThreshold,
}
config.SetCustomChannelConfig(channelID, &fab.ChannelEndpointConfig{
Policies: fab.ChannelPolicies{
EventService: policy,
},
})

params := defaultParams(context, channelID)
require.NotNil(t, params)
assert.Equal(t, -1, params.blockHeightLagThreshold)
})
}
4 changes: 0 additions & 4 deletions pkg/fab/events/client/peerresolver/minblockheight/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,6 @@ func getBlockHeightLagThreshold(policy fab.EventServicePolicy) int {
threshold = 0
case fab.ResolveByThreshold:
threshold = policy.BlockHeightLagThreshold
if threshold <= 0 {
logger.Warnf("Invalid BlockHeightLagThreshold: %d. Using default: %d", threshold, defaultBlockHeightLagThreshold)
threshold = defaultBlockHeightLagThreshold
}
default:
logger.Warnf("Invalid MinBlockHeightResolverMode: [%s]. Using default: [%s]", policy.MinBlockHeightResolverMode, fab.ResolveByThreshold)
threshold = policy.BlockHeightLagThreshold
Expand Down
1 change: 1 addition & 0 deletions pkg/fab/events/deliverclient/deliverclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func TestClientConnect(t *testing.T) {
),
WithSeekType(seek.FromBlock),
WithBlockNum(0),
WithChaincodeID("testChaincode"),
client.WithResponseTimeout(3*time.Second),
)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/fab/events/deliverclient/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type params struct {
connProvider api.ConnectionProvider
seekType seek.Type
fromBlock uint64
chaincodeID string
respTimeout time.Duration
}

Expand Down Expand Up @@ -48,6 +49,15 @@ func WithBlockNum(value uint64) options.Opt {
}
}

// WithChaincodeID specifies the chaincode from which events are to be received.
func WithChaincodeID(value string) options.Opt {
return func(p options.Params) {
if setter, ok := p.(chaincodeIDSetter); ok {
setter.SetChaincodeID(value)
}
}
}

type seekTypeSetter interface {
SetSeekType(value seek.Type)
}
Expand All @@ -56,6 +66,10 @@ type fromBlockSetter interface {
SetFromBlock(value uint64)
}

type chaincodeIDSetter interface {
SetChaincodeID(value string)
}

func (p *params) PermitBlockEvents() {
logger.Debug("PermitBlockEvents")
p.connProvider = deliverProvider
Expand All @@ -79,6 +93,11 @@ func (p *params) SetSeekType(value seek.Type) {
}
}

func (p *params) SetChaincodeID(value string) {
logger.Debugf("ChaincodId: %d", value)
p.chaincodeID = value
}

func (p *params) SetResponseTimeout(value time.Duration) {
logger.Debugf("ResponseTimeout: %s", value)
p.respTimeout = value
Expand Down
1 change: 1 addition & 0 deletions pkg/fabsdk/fabsdk_std.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !pprof
// +build !pprof

/*
Expand Down
24 changes: 22 additions & 2 deletions pkg/fabsdk/provider/chpvdr/cachekey.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ package chpvdr

import (
"crypto/sha256"
"strconv"
"fmt"

"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
)

// ctxtCacheKey is a lazy cache key for the context cache
Expand Down Expand Up @@ -99,6 +100,9 @@ func (k *eventCacheKey) String() string {

type params struct {
permitBlockEvents bool
seekType seek.Type
fromBlock uint64
chaincodeID string
}

func defaultParams() *params {
Expand All @@ -109,8 +113,24 @@ func (p *params) PermitBlockEvents() {
p.permitBlockEvents = true
}

func (p *params) SetFromBlock(value uint64) {
p.fromBlock = value
}

func (p *params) SetSeekType(value seek.Type) {
if value != "" {
p.seekType = value
}
}

func (p *params) SetChaincodeID(value string) {
if value != "" {
p.chaincodeID = value
}
}

func (p *params) getOptKey() string {
// Construct opts portion
optKey := "blockEvents:" + strconv.FormatBool(p.permitBlockEvents)
optKey := fmt.Sprintf("blockEvents:%t,seekType:%s,fromBlock:%d,chaincodeId:%s", p.permitBlockEvents, p.seekType, p.fromBlock, p.chaincodeID)
return optKey
}
5 changes: 4 additions & 1 deletion pkg/fabsdk/provider/chpvdr/chprovider_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build testing
// +build testing

/*
Expand All @@ -24,6 +25,8 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/fab/chconfig"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
"github.com/pkg/errors"
Expand Down Expand Up @@ -83,7 +86,7 @@ func TestBasicValidChannel(t *testing.T) {
assert.NotNil(t, channelConfig)
assert.NotEmptyf(t, channelConfig.ID(), "Got empty channel ID from channel config")

eventService, err := channelService.EventService()
eventService, err := channelService.EventService(client.WithBlockEvents(), deliverclient.WithSeekType("from"), deliverclient.WithBlockNum(10), deliverclient.WithChaincodeID("testChaincode"))
require.NoError(t, err)
require.NotNil(t, eventService)

Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/filesystemwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewFileSystemWallet(path string) (*Wallet, error) {
}

store := &fileSystemWalletStore{cleanPath}
return &Wallet{store}, nil
return NewWalletWithStore(store), nil

}

Expand Down
11 changes: 11 additions & 0 deletions pkg/gateway/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Transaction struct {
request *channel.Request
endorsingPeers []string
collections []string
isInit bool
eventch chan *fab.TxStatusEvent
}

Expand Down Expand Up @@ -83,6 +84,15 @@ func WithCollections(collections ...string) TransactionOption {
}
}

// WithInit is an option that makes the transaction fulfill the --init-required condition before the chaincode
// can be invoked to process regular transactions
func WithInit() TransactionOption {
return func(txn *Transaction) error {
txn.isInit = true
return nil
}
}

// Evaluate a transaction function and return its results.
// The transaction function will be evaluated on the endorsing peers but
// the responses will not be sent to the ordering service and hence will
Expand Down Expand Up @@ -124,6 +134,7 @@ func (txn *Transaction) Submit(args ...string) ([]byte, error) {
bytes[i] = []byte(v)
}
txn.request.Args = bytes
txn.request.IsInit = txn.isInit

var options []channel.RequestOption
if txn.endorsingPeers != nil {
Expand Down
51 changes: 51 additions & 0 deletions pkg/gateway/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,57 @@ func TestTransactionOptions(t *testing.T) {
txn.Submit("arg1", "arg2")
}

func TestInitTransactionOptions(t *testing.T) {
transient := make(map[string][]byte)
transient["price"] = []byte("8500")

c := mockChannelProvider("mychannel")

gw := &Gateway{
options: &gatewayOptions{
Timeout: defaultTimeout,
},
}

nw, err := newNetwork(gw, c)

if err != nil {
t.Fatalf("Failed to create network: %s", err)
}

contr := nw.GetContract("contract1")

txn, err := contr.CreateTransaction(
"txn1",
WithTransient(transient),
WithEndorsingPeers("peer1"),
WithCollections("_implicit_org_org1"),
WithInit(),
)

if err != nil {
t.Fatalf("Failed to create transaction: %s", err)
}

data := txn.request.TransientMap["price"]
if string(data) != "8500" {
t.Fatalf("Incorrect transient data: %s", string(data))
}

endorsers := txn.endorsingPeers
if endorsers[0] != "peer1" {
t.Fatalf("Incorrect endorsing peer: %s", endorsers[0])
}

collections := txn.collections
if collections[0] != "_implicit_org_org1" {
t.Fatalf("Incorrect collection: %s", collections[0])
}

txn.Evaluate("arg1", "arg2")
txn.Submit("arg1", "arg2")
}

func TestCommitEvent(t *testing.T) {
c := mockChannelProvider("mychannel")

Expand Down
Loading

0 comments on commit fae0d70

Please sign in to comment.