Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BFT Block Puller: updatable connection source #4571

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions common/deliverclient/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we have any util package in our entire codebase we can move this function to?

Copy link
Contributor Author

@tock-ibm tock-ibm Jan 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It got promoted from orderer/common/cluster/util.go to common/deliverclient/util.go, where I plan to put all the stuff that is common to both orderer and peer in the context of the block puller. Can't put it in package common/util nor protoutil because of import cycles. I am open to suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the import cycle is because of github.com/hyperledger/fabric/common/configtx? Then can't you move it there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However if you think there are additional things to be placed here, I guess we can leave it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of import cycles it can be placed in common/configtx, but I think it is not a good place to put code that parses blocks, as that package deals only with individual txs. I prefer to leave it where it is for the time being. I think that during the refactoring that is about to happen when we overhaul the orderer, several additional things will get moved into that util.go file.

Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package deliverclient

import (
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)

var ErrNotAConfig = errors.New("not a config block")

// ConfigFromBlock returns a ConfigEnvelope if exists, or a *ErrNotAConfig error.
// It may also return some other error in case parsing failed.
func ConfigFromBlock(block *common.Block) (*common.ConfigEnvelope, error) {
if block == nil || block.Data == nil || len(block.Data.Data) == 0 {
return nil, errors.New("empty block")
}
txn := block.Data.Data[0]
env, err := protoutil.GetEnvelopeFromBlock(txn)
if err != nil {
return nil, errors.WithStack(err)
}
payload, err := protoutil.UnmarshalPayload(env.Payload)
if err != nil {
return nil, errors.WithStack(err)
}
if block.Header.Number == 0 {
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
return nil, errors.Wrap(err, "invalid config envelope")
}
return configEnvelope, nil
}
if payload.Header == nil {
return nil, errors.New("nil header in payload")
}
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, errors.WithStack(err)
}
if common.HeaderType(chdr.Type) != common.HeaderType_CONFIG {
return nil, ErrNotAConfig
}
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
return nil, errors.Wrap(err, "invalid config envelope")
}
return configEnvelope, nil
}
106 changes: 106 additions & 0 deletions common/deliverclient/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package deliverclient_test

import (
"testing"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/deliverclient"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/require"
)

func TestConfigFromBlockBadInput(t *testing.T) {
for _, testCase := range []struct {
name string
block *common.Block
expectedError string
}{
{
name: "nil block",
expectedError: "empty block",
block: nil,
},
{
name: "nil block data",
expectedError: "empty block",
block: &common.Block{},
},
{
name: "no data in block",
expectedError: "empty block",
block: &common.Block{Data: &common.BlockData{}},
},
{
name: "invalid payload",
expectedError: "error unmarshalling Envelope",
block: &common.Block{Data: &common.BlockData{Data: [][]byte{{1, 2, 3}}}},
},
{
name: "bad genesis block",
expectedError: "invalid config envelope",
block: &common.Block{
Header: &common.BlockHeader{}, Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{
Payload: protoutil.MarshalOrPanic(&common.Payload{
Data: []byte{1, 2, 3},
}),
})}},
},
},
{
name: "invalid envelope in block",
expectedError: "error unmarshalling Envelope",
block: &common.Block{Data: &common.BlockData{Data: [][]byte{{1, 2, 3}}}},
},
{
name: "invalid payload in block envelope",
expectedError: "error unmarshalling Payload",
block: &common.Block{Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{
Payload: []byte{1, 2, 3},
})}}},
},
{
name: "invalid channel header",
expectedError: "error unmarshalling ChannelHeader",
block: &common.Block{
Header: &common.BlockHeader{Number: 1},
Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{
Payload: protoutil.MarshalOrPanic(&common.Payload{
Header: &common.Header{
ChannelHeader: []byte{1, 2, 3},
},
}),
})}},
},
},
{
name: "invalid config block",
expectedError: "invalid config envelope",
block: &common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(&common.Envelope{
Payload: protoutil.MarshalOrPanic(&common.Payload{
Data: []byte{1, 2, 3},
Header: &common.Header{
ChannelHeader: protoutil.MarshalOrPanic(&common.ChannelHeader{
Type: int32(common.HeaderType_CONFIG),
}),
},
}),
})}},
},
},
} {
t.Run(testCase.name, func(t *testing.T) {
conf, err := deliverclient.ConfigFromBlock(testCase.block)
require.Nil(t, conf)
require.Error(t, err)
require.Contains(t, err.Error(), testCase.expectedError)
})
}
}
29 changes: 16 additions & 13 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ type Config struct {
// Gossip enables to enumerate peers in the channel, send a message to peers,
// and add a block to the gossip state transfer layer.
Gossip blocksprovider.GossipServiceAdapter
// OrdererSource provides orderer endpoints, complete with TLS cert pools.
OrdererSource *orderers.ConnectionSource
// OrdererEndpointOverrides provides peer-specific orderer endpoints overrides.
// These are loaded once when the peer starts.
OrdererEndpointOverrides map[string]*orderers.Endpoint
// Signer is the identity used to sign requests.
Signer identity.SignerSerializer
// DeliverServiceConfig is the configuration object.
Expand Down Expand Up @@ -191,14 +192,15 @@ func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo
SecOpts: d.conf.DeliverServiceConfig.SecOpts,
},
},
Orderers: d.conf.OrdererSource,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryInterval: 100 * time.Millisecond,
OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides},
CryptoProvider: d.conf.CryptoProvider,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryInterval: 100 * time.Millisecond,
MaxRetryDurationExceededHandler: func() (stopRetries bool) {
return !d.conf.IsStaticLeader
},
Expand All @@ -212,7 +214,7 @@ func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo
dc.TLSCertHash = util.ComputeSHA256(cert.Certificate[0])
}

dc.Initialize()
dc.Initialize(d.conf.ChannelConfig)

return dc, nil
}
Expand Down Expand Up @@ -254,7 +256,8 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
SecOpts: d.conf.DeliverServiceConfig.SecOpts,
},
},
Orderers: d.conf.OrdererSource,
OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides},
CryptoProvider: d.conf.CryptoProvider,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Expand All @@ -277,7 +280,7 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
dcBFT.TLSCertHash = util.ComputeSHA256(cert.Certificate[0])
}

dcBFT.Initialize()
dcBFT.Initialize(d.conf.ChannelConfig)

return dcBFT, nil
}
Expand Down
25 changes: 1 addition & 24 deletions core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,36 +288,13 @@ func (p *Peer) createChannel(
mspmgmt.XXXSetMSPManager(cid, bundle.MSPManager())
}

osLogger := flogging.MustGetLogger("peer.orderers")
namedOSLogger := osLogger.With("channel", cid)
ordererSource := orderers.NewConnectionSource(namedOSLogger, p.OrdererEndpointOverrides)

ordererSourceCallback := func(bundle *channelconfig.Bundle) {
globalAddresses := bundle.ChannelConfig().OrdererAddresses()
orgAddresses := map[string]orderers.OrdererOrg{}
if ordererConfig, ok := bundle.OrdererConfig(); ok {
for orgName, org := range ordererConfig.Organizations() {
var certs [][]byte
certs = append(certs, org.MSP().GetTLSRootCerts()...)
certs = append(certs, org.MSP().GetTLSIntermediateCerts()...)

orgAddresses[orgName] = orderers.OrdererOrg{
Addresses: org.Endpoints(),
RootCerts: certs,
}
}
}
ordererSource.Update(globalAddresses, orgAddresses)
}

channel := &Channel{
ledger: l,
resources: bundle,
cryptoProvider: p.CryptoProvider,
}

callbacks := []channelconfig.BundleActor{
ordererSourceCallback,
gossipCallbackWrapper,
trustedRootsCallbackWrapper,
mspCallback,
Expand Down Expand Up @@ -373,7 +350,7 @@ func (p *Peer) createChannel(

p.GossipService.InitializeChannel(
bundle.ConfigtxValidator().ChannelID(),
ordererSource,
p.OrdererEndpointOverrides,
store,
gossipservice.Support{
Validator: validator,
Expand Down
30 changes: 18 additions & 12 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type GossipServiceAdapter interface {
// DeliveryServiceFactory factory to create and initialize delivery service instance
type DeliveryServiceFactory interface {
// Returns an instance of delivery client
Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService
Service(g GossipServiceAdapter, ordererEndpointOverrides map[string]*orderers.Endpoint, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService
}

type deliveryFactoryImpl struct {
Expand All @@ -139,21 +139,20 @@ type deliveryFactoryImpl struct {
// Returns an instance of delivery service
func (df *deliveryFactoryImpl) Service(
g GossipServiceAdapter,
ordererSource *orderers.ConnectionSource,
mcs api.MessageCryptoService, // TODO remove
ordererEndpointOverrides map[string]*orderers.Endpoint,
isStaticLead bool,
channelConfig *cb.Config,
cryptoProvider bccsp.BCCSP,
) deliverservice.DeliverService {
return deliverservice.NewDeliverService(
&deliverservice.Config{
IsStaticLeader: isStaticLead,
Gossip: g,
Signer: df.signer,
DeliverServiceConfig: df.deliverServiceConfig,
OrdererSource: ordererSource,
ChannelConfig: channelConfig,
CryptoProvider: cryptoProvider,
IsStaticLeader: isStaticLead,
Gossip: g,
Signer: df.signer,
DeliverServiceConfig: df.deliverServiceConfig,
OrdererEndpointOverrides: ordererEndpointOverrides,
ChannelConfig: channelConfig,
CryptoProvider: cryptoProvider,
})
}

Expand Down Expand Up @@ -334,7 +333,14 @@ type Support struct {
}

// InitializeChannel allocates the state provider and should be invoked once per channel per execution
func (g *GossipService) InitializeChannel(channelID string, ordererSource *orderers.ConnectionSource, store *transientstore.Store, support Support, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) {
func (g *GossipService) InitializeChannel(
channelID string,
ordererEndpointOverrides map[string]*orderers.Endpoint,
store *transientstore.Store,
support Support,
channelConfig *cb.Config,
cryptoProvider bccsp.BCCSP,
) {
g.lock.Lock()
defer g.lock.Unlock()
// Initialize new state provider for given committer
Expand Down Expand Up @@ -393,7 +399,7 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order
blockingMode,
stateConfig)
if g.deliveryService[channelID] == nil {
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererSource, g.mcs, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider)
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererEndpointOverrides, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider)
}

// Delivery service might be nil only if it was not able to get connected
Expand Down
Loading