Skip to content

Commit

Permalink
[FAB-5535]: Add coordinator for blocks and pvtdata
Browse files Browse the repository at this point in the history
Currently there is a state transfer layer in gossip, which takes care to
reordering the blocks according to the sequence number order, as well it
takes care to replicate blocks in case of network distruptions and there
are missing blocks in peer state.

In context of work required for FAB-1151, where there is private data
which distributed off the leadger, meaning not a part of transaction
proposal submitted to the ordering service. There is a need to
coordinate availability of this private data and the block itself.

Therefore this commit adds new entity to orchestrate the arrival of blocks
and the readiness of the block for commit, e.g. when peer will have all
relevant private data kept in transient store coordinator will commit
it into the ledger providing required private data.

Change-Id: I9cf47c9647e3f7fc949c5af2c6cd73c991f970d5
Signed-off-by: Artem Barger <[email protected]>
  • Loading branch information
C0rWin committed Aug 8, 2017
1 parent c9426cf commit 8527376
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 20 deletions.
60 changes: 60 additions & 0 deletions gossip/state/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package state

import (
"fmt"

"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset"
)

// PvtData a placeholder to represent private data
type PvtData struct {
Payload *rwset.TxPvtReadWriteSet
}

// Coordinator orchestrates the flow of the new
// blocks arrival and in flight transient data, responsible
// to complete missing parts of transient data for given block.
type Coordinator interface {
// StoreBlock deliver new block with underlined private data
// returns missing transaction ids
StoreBlock(block *common.Block, data ...[]*PvtData) ([]string, error)

// GetBlockByNum returns block and related to the block private data
GetBlockByNum(seqNum uint64) (*common.Block, []*PvtData, error)

// Get recent block sequence number
LedgerHeight() (uint64, error)

// Close coordinator, shuts down coordinator service
Close()
}

type coordinator struct {
committer.Committer
}

// NewCoordinator creates a new instance of coordinator
func NewCoordinator(committer committer.Committer) Coordinator {
return &coordinator{Committer: committer}
}

func (c *coordinator) StoreBlock(block *common.Block, data ...[]*PvtData) ([]string, error) {
// Need to check whenever there are missing private rwset
return nil, c.Committer.Commit(block)
}

func (c *coordinator) GetBlockByNum(seqNum uint64) (*common.Block, []*PvtData, error) {
blocks := c.Committer.GetBlocks([]uint64{seqNum})
if len(blocks) == 0 {
return nil, nil, fmt.Errorf("Cannot retreive block number %d", seqNum)
}
return blocks[0], nil, nil
}
98 changes: 78 additions & 20 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/op/go-logging"
)

Expand Down Expand Up @@ -91,7 +92,7 @@ type GossipStateProviderImpl struct {
// Queue of payloads which wasn't acquired yet
payloads PayloadsBuffer

committer committer.Committer
coordinator Coordinator

stateResponseCh chan proto.ReceivedMessage

Expand All @@ -112,8 +113,11 @@ func init() {
logger = util.GetLogger(util.LoggingStateModule, "")
}

// NewGossipStateProvider creates initialized instance of gossip state provider
func NewGossipStateProvider(chainID string, g GossipAdapter, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider {
// NewGossipCoordinatedStateProvider creates state provider with coordinator instance
// to orchestrate arrival of private rwsets and blocks before committing them into the ledger.
func NewGossipCoordinatedStateProvider(chainID string, g GossipAdapter,
coordinator Coordinator, mcs api.MessageCryptoService) GossipStateProvider {

logger := util.GetLogger(util.LoggingStateModule, "")

gossipChan, _ := g.Accept(func(message interface{}) bool {
Expand Down Expand Up @@ -145,7 +149,7 @@ func NewGossipStateProvider(chainID string, g GossipAdapter, committer committer
// Filter message which are only relevant for nodeMetastate transfer
_, commChan := g.Accept(remoteStateMsgFilter, true)

height, err := committer.LedgerHeight()
height, err := coordinator.LedgerHeight()
if height == 0 {
// Panic here since this is an indication of invalid situation which should not happen in normal
// code path.
Expand Down Expand Up @@ -178,7 +182,7 @@ func NewGossipStateProvider(chainID string, g GossipAdapter, committer committer
// Create a queue for payload received
payloads: NewPayloadsBuffer(height),

committer: committer,
coordinator: coordinator,

stateResponseCh: make(chan proto.ReceivedMessage, defChannelBufferSize),

Expand Down Expand Up @@ -218,6 +222,12 @@ func NewGossipStateProvider(chainID string, g GossipAdapter, committer committer
return s
}

// NewGossipStateProvider creates initialized instance of gossip state provider with committer
// which is wrapped up into coordinator, kept for API compatibility
func NewGossipStateProvider(chainID string, g GossipAdapter, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider {
return NewGossipCoordinatedStateProvider(chainID, g, NewCoordinator(committer), mcs)
}

func (s *GossipStateProviderImpl) listen() {
defer s.done.Done()

Expand Down Expand Up @@ -304,7 +314,7 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage)
return
}

currentHeight, err := s.committer.LedgerHeight()
currentHeight, err := s.coordinator.LedgerHeight()
if err != nil {
logger.Errorf("Cannot access to current ledger height, due to %s", err)
return
Expand All @@ -318,22 +328,51 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage)

response := &proto.RemoteStateResponse{Payloads: make([]*proto.Payload, 0)}
for seqNum := request.StartSeqNum; seqNum <= endSeqNum; seqNum++ {
logger.Debug("Reading block ", seqNum, " from the committer service")
blocks := s.committer.GetBlocks([]uint64{seqNum})
logger.Debug("Reading block ", seqNum, " with private data from the coordinator service")
block, pvtData, err := s.coordinator.GetBlockByNum(seqNum)

if len(blocks) == 0 {
if err != nil {
logger.Errorf("Wasn't able to read block with sequence number %d from ledger, "+
"due to %s skipping....", seqNum, err)
continue
}

if block == nil {
logger.Errorf("Wasn't able to read block with sequence number %d from ledger, skipping....", seqNum)
continue
}

blockBytes, err := pb.Marshal(blocks[0])
blockBytes, err := pb.Marshal(block)

// Marshal private data
if err != nil {
logger.Errorf("Could not marshal block: %s", err)
continue
}

// TODO: Need to extract orgID of the requester and filter out
// private data entries which doesn't belongs to collections
// allowed for sender organization based on policies
pvtDataBytes := make([][]byte, 0)
err = nil
for index, each := range pvtData {
pvtBytes, err := pb.Marshal(each.Payload)
if err != nil {
logger.Errorf("Could not marshal private rwset index %d, due to %s", index, err)
break
}
pvtDataBytes = append(pvtDataBytes, pvtBytes)
}
if err != nil {
logger.Errorf("Failed to marshal private rwset for block %d due to %s", seqNum, err)
continue
}

// Appending result to the response
response.Payloads = append(response.Payloads, &proto.Payload{
SeqNum: seqNum,
Data: blockBytes,
SeqNum: seqNum,
Data: blockBytes,
PrivateData: pvtDataBytes,
})
}
// Sending back response with missing blocks
Expand Down Expand Up @@ -380,7 +419,7 @@ func (s *GossipStateProviderImpl) Stop() {
// Make sure all go-routines has finished
s.done.Wait()
// Close all resources
s.committer.Close()
s.coordinator.Close()
close(s.stateRequestCh)
close(s.stateResponseCh)
close(s.stopCh)
Expand Down Expand Up @@ -428,7 +467,24 @@ func (s *GossipStateProviderImpl) deliverPayloads() {
continue
}
logger.Debug("New block with claimed sequence number ", payload.SeqNum, " transactions num ", len(rawBlock.Data.Data))
if err := s.commitBlock(rawBlock); err != nil {

// Read all private data into slice
pvt := make([]*PvtData, 0)
var err error
for _, each := range payload.PrivateData {
payload := &rwset.TxPvtReadWriteSet{}
if err = pb.Unmarshal(each, payload); err != nil {
break
}
pvt = append(pvt, &PvtData{Payload: payload})
}

if err != nil {
logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%s)...dropping block", payload.SeqNum, err)
continue
}

if err := s.commitBlock(rawBlock, pvt); err != nil {
logger.Panicf("Cannot commit block to the ledger due to %s", err)
}
}
Expand All @@ -450,7 +506,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
s.stopCh <- struct{}{}
return
case <-time.After(defAntiEntropyInterval):
current, err := s.committer.LedgerHeight()
current, err := s.coordinator.LedgerHeight()
if err != nil {
// Unable to read from ledger continue to the next round
logger.Error("Cannot obtain ledger height, due to", err)
Expand Down Expand Up @@ -603,8 +659,8 @@ func (s *GossipStateProviderImpl) hasRequiredHeight(height uint64) func(peer dis
func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block {
// Try to read missing block from the ledger, should return no nil with
// content including at least one block
if blocks := s.committer.GetBlocks([]uint64{index}); blocks != nil && len(blocks) > 0 {
return blocks[0]
if block, _, err := s.coordinator.GetBlockByNum(index); block != nil && err != nil {
return block
}

return nil
Expand All @@ -616,7 +672,7 @@ func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
return errors.New("Given payload is nil")
}
logger.Debug("Adding new payload into the buffer, seqNum = ", payload.SeqNum)
height, err := s.committer.LedgerHeight()
height, err := s.coordinator.LedgerHeight()
if err != nil {
return fmt.Errorf("Failed obtaining ledger height: %v", err)
}
Expand All @@ -628,8 +684,10 @@ func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
return s.payloads.Push(payload)
}

func (s *GossipStateProviderImpl) commitBlock(block *common.Block) error {
if err := s.committer.Commit(block); err != nil {
func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData []*PvtData) error {

// Commit block with available private transactions
if _, err := s.coordinator.StoreBlock(block, pvtData); err != nil {
logger.Errorf("Got error while committing(%s)", err)
return err
}
Expand Down
Loading

0 comments on commit 8527376

Please sign in to comment.