From 7871c2683fe6f5741b5debce409b9156bcc47963 Mon Sep 17 00:00:00 2001 From: yacovm Date: Wed, 26 May 2021 01:35:43 +0300 Subject: [PATCH] Optionally disable gossip block forwarding (#2606) This commit adds a new configuration option to the peer which makes peers not forward blocks that they pull from the ordering service. If all peers in an organization explicitly set "peer.deliveryclient.blockGossipEnabled" to false, no peer in the organization gossips blocks to any other peer in that organization. Change-Id: I5d9b278ae72f239129827c044fa78179f6ba87ab Signed-off-by: Yacov Manevich (cherry picked from commit 4e201afc69efb6d6546e1be904c5e658ca56b9a0) --- core/deliverservice/config.go | 9 ++++++++ core/deliverservice/config_test.go | 2 ++ core/deliverservice/deliveryclient.go | 19 ++++++++-------- .../pkg/peer/blocksprovider/blocksprovider.go | 14 ++++++++---- .../blocksprovider/blocksprovider_test.go | 22 +++++++++++++++++++ sampleconfig/core.yaml | 5 +++++ 6 files changed, 58 insertions(+), 13 deletions(-) diff --git a/core/deliverservice/config.go b/core/deliverservice/config.go index 38a13caf98b..5f265175dda 100644 --- a/core/deliverservice/config.go +++ b/core/deliverservice/config.go @@ -29,6 +29,8 @@ const ( type DeliverServiceConfig struct { // PeerTLSEnabled enables/disables Peer TLS. PeerTLSEnabled bool + // BlockGossipEnabled enables block forwarding via gossip + BlockGossipEnabled bool // ReConnectBackoffThreshold sets the delivery service maximal delay between consencutive retries. ReConnectBackoffThreshold time.Duration // ReconnectTotalTimeThreshold sets the total time the delivery service may spend in reconnection attempts @@ -95,6 +97,13 @@ func LoadOverridesMap() (map[string]*orderers.Endpoint, error) { } func (c *DeliverServiceConfig) loadDeliverServiceConfig() { + enabledKey := "peer.deliveryclient.blockGossipEnabled" + enabledConfigOptionMissing := !viper.IsSet(enabledKey) + if enabledConfigOptionMissing { + logger.Infof("peer.deliveryclient.blockGossipEnabled is not set, defaulting to true.") + } + c.BlockGossipEnabled = enabledConfigOptionMissing || viper.GetBool(enabledKey) + c.PeerTLSEnabled = viper.GetBool("peer.tls.enabled") c.ReConnectBackoffThreshold = viper.GetDuration("peer.deliveryclient.reConnectBackoffThreshold") diff --git a/core/deliverservice/config_test.go b/core/deliverservice/config_test.go index 2ecc13e979c..424660f1f95 100644 --- a/core/deliverservice/config_test.go +++ b/core/deliverservice/config_test.go @@ -93,6 +93,7 @@ func TestGlobalConfig(t *testing.T) { coreConfig := deliverservice.GlobalConfig() expectedConfig := &deliverservice.DeliverServiceConfig{ + BlockGossipEnabled: true, PeerTLSEnabled: true, ReConnectBackoffThreshold: 25 * time.Second, ReconnectTotalTimeThreshold: 20 * time.Second, @@ -119,6 +120,7 @@ func TestGlobalConfigDefault(t *testing.T) { coreConfig := deliverservice.GlobalConfig() expectedConfig := &deliverservice.DeliverServiceConfig{ + BlockGossipEnabled: true, PeerTLSEnabled: false, ReConnectBackoffThreshold: deliverservice.DefaultReConnectBackoffThreshold, ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold, diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 88356448d70..41b5f6b4c6c 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -128,15 +128,16 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b Dialer: DialerAdapter{ Client: d.conf.DeliverGRPCClient, }, - Orderers: d.conf.OrdererSource, - DoneC: make(chan struct{}), - Signer: d.conf.Signer, - DeliverStreamer: DeliverAdapter{}, - Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID), - MaxRetryDelay: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold, - MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold, - InitialRetryDelay: 100 * time.Millisecond, - YieldLeadership: !d.conf.IsStaticLeader, + Orderers: d.conf.OrdererSource, + DoneC: make(chan struct{}), + Signer: d.conf.Signer, + DeliverStreamer: DeliverAdapter{}, + Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID), + MaxRetryDelay: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold, + MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold, + BlockGossipDisabled: !d.conf.DeliverServiceConfig.BlockGossipEnabled, + InitialRetryDelay: 100 * time.Millisecond, + YieldLeadership: !d.conf.IsStaticLeader, } if d.conf.DeliverGRPCClient.MutualTLSRequired() { diff --git a/internal/pkg/peer/blocksprovider/blocksprovider.go b/internal/pkg/peer/blocksprovider/blocksprovider.go index fb88596521c..788d8656fa1 100644 --- a/internal/pkg/peer/blocksprovider/blocksprovider.go +++ b/internal/pkg/peer/blocksprovider/blocksprovider.go @@ -96,9 +96,10 @@ type Deliverer struct { Logger *flogging.FabricLogger YieldLeadership bool - MaxRetryDelay time.Duration - InitialRetryDelay time.Duration - MaxRetryDuration time.Duration + BlockGossipDisabled bool + MaxRetryDelay time.Duration + InitialRetryDelay time.Duration + MaxRetryDuration time.Duration // TLSCertHash should be nil when TLS is not enabled TLSCertHash []byte // util.ComputeSHA256(b.credSupport.GetClientCertificate().Certificate[0]) @@ -111,6 +112,9 @@ const backoffExponentBase = 1.2 // DeliverBlocks used to pull out blocks from the ordering service to // distributed them across peers func (d *Deliverer) DeliverBlocks() { + if d.BlockGossipDisabled { + d.Logger.Infof("Will pull blocks without forwarding them to remote peers via gossip") + } failureCounter := 0 totalDuration := time.Duration(0) @@ -256,7 +260,9 @@ func (d *Deliverer) processMsg(msg *orderer.DeliverResponse) error { d.Logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err) return errors.WithMessage(err, "could not add block as payload") } - + if d.BlockGossipDisabled { + return nil + } // Gossip messages with other nodes d.Logger.Debugf("Gossiping block [%d]", blockNum) d.Gossip.Gossip(gossipMsg) diff --git a/internal/pkg/peer/blocksprovider/blocksprovider_test.go b/internal/pkg/peer/blocksprovider/blocksprovider_test.go index fa54367be8a..978b1ab2dfb 100644 --- a/internal/pkg/peer/blocksprovider/blocksprovider_test.go +++ b/internal/pkg/peer/blocksprovider/blocksprovider_test.go @@ -533,6 +533,28 @@ var _ = Describe("Blocksprovider", func() { }, })) }) + + When("gossip dissemination is disabled", func() { + BeforeEach(func() { + d.BlockGossipDisabled = true + }) + + It("doesn't gossip, only adds to the payload buffer", func() { + Eventually(fakeGossipServiceAdapter.AddPayloadCallCount).Should(Equal(1)) + channelID, payload := fakeGossipServiceAdapter.AddPayloadArgsForCall(0) + Expect(channelID).To(Equal("channel-id")) + Expect(payload).To(Equal(&gossip.Payload{ + Data: protoutil.MarshalOrPanic(&common.Block{ + Header: &common.BlockHeader{ + Number: 8, + }, + }), + SeqNum: 8, + })) + + Consistently(fakeGossipServiceAdapter.GossipCallCount).Should(Equal(0)) + }) + }) }) When("the deliver client returns a status", func() { diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index 17523ce51b0..d89d93ab625 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -343,6 +343,11 @@ peer: # Delivery service related config deliveryclient: + # Enables this peer to disseminate blocks it pulled from the ordering service + # via gossip. + # Note that 'gossip.state.enabled' controls point to point block replication + # of blocks committed in the past. + blockGossipEnabled: true # It sets the total time the delivery service may spend in reconnection # attempts until its retry logic gives up and returns an error reconnectTotalTimeThreshold: 3600s