Skip to content

Commit

Permalink
Decompose writes method
Browse files Browse the repository at this point in the history
Remove unused struct member and to-do comments.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
  • Loading branch information
twoGiants committed Feb 19, 2025
1 parent 3943e39 commit 7ede469
Showing 1 changed file with 35 additions and 32 deletions.
67 changes: 35 additions & 32 deletions off_chain_data/application-go/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"syscall"

"github.com/hyperledger/fabric-gateway/pkg/client"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -65,7 +66,6 @@ func listen(clientConnection grpc.ClientConnInterface) error {
parser.ParseBlock(blockProto),
checkpointer,
applyWritesToOffChainStore,
channelName,
}

if err := aBlockProcessor.process(); err != nil {
Expand All @@ -81,7 +81,6 @@ type blockProcessor struct {
parsedBlock *parser.Block
checkpointer *client.FileCheckpointer
writeToStore writer
channelName string
}

func (b *blockProcessor) process() error {
Expand All @@ -96,9 +95,7 @@ func (b *blockProcessor) process() error {
txProcessor := transactionProcessor{
b.parsedBlock.Number(),
validTransaction,
// TODO use reference to parent and get blockNumber, store and channelName from parent
b.writeToStore,
b.channelName,
}
if err := txProcessor.process(); err != nil {
return err
Expand All @@ -118,7 +115,7 @@ func (b *blockProcessor) process() error {
}

func (b *blockProcessor) validTransactions() ([]*parser.Transaction, error) {
result := []*parser.Transaction{}
var result []*parser.Transaction
newTransactions, err := b.getNewTransactions()
if err != nil {
return nil, err
Expand Down Expand Up @@ -158,7 +155,7 @@ func (b *blockProcessor) findLastProcessedIndex() (int, error) {
return 0, err
}

blockTransactionIDs := []string{}
var blockTransactionIDs []string
for _, transaction := range transactions {
blockTransactionIDs = append(blockTransactionIDs, transaction.ChannelHeader().GetTxId())
}
Expand Down Expand Up @@ -188,7 +185,6 @@ type transactionProcessor struct {
blockNumber uint64
transaction *parser.Transaction
writeToStore writer
channelName string
}

func (t *transactionProcessor) process() error {
Expand Down Expand Up @@ -218,43 +214,50 @@ func (t *transactionProcessor) process() error {
}

func (t *transactionProcessor) writes() ([]write, error) {
// TODO this entire code should live in the parser and just return the kvWrite which
// we then map to write and return
t.channelName = t.transaction.ChannelHeader().GetChannelId()

nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets()
nsReadWriteSets, err := t.nonSystemCCReadWriteSets()
if err != nil {
return nil, err
}

nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{}
var result []write
for _, nsReadWriteSet := range nsReadWriteSets {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
kvReadWriteSet, err := nsReadWriteSet.ReadWriteSet()
if err != nil {
return nil, err
}

result = t.newWrites(kvReadWriteSet, nsReadWriteSet.Namespace())
}

writes := []write{}
for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace()
return result, nil
}

kvReadWriteSet, err := readWriteSet.ReadWriteSet()
if err != nil {
return nil, err
}
func (t *transactionProcessor) nonSystemCCReadWriteSets() ([]*parser.NamespaceReadWriteSet, error) {
nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets()
if err != nil {
return nil, err
}

for _, kvWrite := range kvReadWriteSet.GetWrites() {
writes = append(writes, write{
ChannelName: t.channelName,
Namespace: namespace,
Key: kvWrite.GetKey(),
IsDelete: kvWrite.GetIsDelete(),
Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
})
}
return slices.DeleteFunc(nsReadWriteSets, func(nsReadWriteSet *parser.NamespaceReadWriteSet) bool {
return t.isSystemChaincode(nsReadWriteSet.Namespace())
}), nil
}

func (t *transactionProcessor) newWrites(kvReadWriteSet *kvrwset.KVRWSet, namespace string) []write {
channelName := t.transaction.ChannelHeader().GetChannelId()

var result []write
for _, kvWrite := range kvReadWriteSet.GetWrites() {
result = append(result, write{
ChannelName: channelName,
Namespace: namespace,
Key: kvWrite.GetKey(),
IsDelete: kvWrite.GetIsDelete(),
Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
})
}

return writes, nil
return result
}

func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool {
Expand Down

0 comments on commit 7ede469

Please sign in to comment.