Skip to content

Commit

Permalink
[FAB-5711] Fix errors in config update msg process.
Browse files Browse the repository at this point in the history
This patch adds `LastOffsetPersisted` to the block that
is cut in config update message processing path.
Also, `lastCutBlockNumber` is incremented in this path.

Change-Id: I07a31b33b0533d3da145b033c6aa6ffd6f9cc61c
Signed-off-by: Jay Guo <[email protected]>
  • Loading branch information
guoger committed Aug 11, 2017
1 parent 8527376 commit 3c663df
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 12 deletions.
8 changes: 6 additions & 2 deletions orderer/consensus/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,14 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support consensus.Co
batch := support.BlockCutter().Cut()
if batch != nil {
block := support.CreateNextBlock(batch)
support.WriteBlock(block, nil)
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: receivedOffset - 1})
support.WriteBlock(block, encodedLastOffsetPersisted)
*lastCutBlockNumber++
}
block := support.CreateNextBlock([]*cb.Envelope{env})
support.WriteConfigBlock(block, nil)
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: receivedOffset})
support.WriteConfigBlock(block, encodedLastOffsetPersisted)
*lastCutBlockNumber++
*timer = nil
case msgprocessor.NormalMsg:
_, err := support.ProcessNormalMsg(env)
Expand Down
152 changes: 142 additions & 10 deletions orderer/consensus/kafka/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/common/blockcutter"
mockmultichannel "github.com/hyperledger/fabric/orderer/mocks/common/multichannel"
cb "github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -888,11 +889,10 @@ func TestProcessMessagesToBlocks(t *testing.T) {
})

t.Run("ReceiveTwoRegularAndCutTwoBlocks", func(t *testing.T) {
subtestIndex++

if testing.Short() {
t.Skip("Skipping test in short mode")
}
subtestIndex++

errorChan := make(chan struct{})
close(errorChan)
Expand Down Expand Up @@ -968,14 +968,13 @@ func TestProcessMessagesToBlocks(t *testing.T) {
assert.Equal(t, uint64(2), counts[indexRecvPass], "Expected 2 messages received and unmarshaled")
assert.Equal(t, uint64(2), counts[indexProcessRegularPass], "Expected 2 REGULAR messages processed")
assert.Equal(t, lastCutBlockNumber+2, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to be bumped up by two")
assert.Equal(t, expectedOffset+1, extractEncodedOffset(block1.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", newestOffset+1)
assert.Equal(t, expectedOffset+2, extractEncodedOffset(block2.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", newestOffset+2)
assert.Equal(t, expectedOffset+1, extractEncodedOffset(block1.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", expectedOffset+1)
assert.Equal(t, expectedOffset+2, extractEncodedOffset(block2.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", expectedOffset+2)
})

t.Run("ReceiveRegularAndSendTimeToCut", func(t *testing.T) {
subtestIndex++

t.Skip("Skipping test as it introduces a race condition")
subtestIndex++

// NB We haven't set a handlermap for the mock broker so we need to set
// the ProduceResponse
Expand Down Expand Up @@ -1049,9 +1048,8 @@ func TestProcessMessagesToBlocks(t *testing.T) {
// - Consumer.Retry.Backoff
// - Metadata.Retry.Max

subtestIndex++

t.Skip("Skipping test as it introduces a race condition")
subtestIndex++

// Exact same test as ReceiveRegularAndSendTimeToCut.
// Only difference is that the producer's attempt to send a TTC will
Expand Down Expand Up @@ -1401,9 +1399,8 @@ func TestProcessMessagesToBlocks(t *testing.T) {
})

t.Run("ReceiveKafkaErrorAndThenReceiveRegularMessage", func(t *testing.T) {
subtestIndex++

t.Skip("Skipping test as it introduces a race condition")
subtestIndex++

// If we set up the mock broker so that it returns a response, if the
// test finishes before the sendConnectMessage goroutine has received
Expand Down Expand Up @@ -1486,4 +1483,139 @@ func TestProcessMessagesToBlocks(t *testing.T) {
logger.Debug("haltChan closed")
<-done
})

t.Run("ReceiveBadConfigEnvelope", func(t *testing.T) {
subtestIndex++

errorChan := make(chan struct{})
close(errorChan)
haltChan := make(chan struct{})

lastCutBlockNumber := uint64(3)

mockSupport := &mockmultichannel.ConsenterSupport{
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
ClassifyMsgVal: msgprocessor.ConfigUpdateMsg,
ProcessNormalMsgErr: fmt.Errorf("Error processing config update"),
SharedConfigVal: &mockconfig.Orderer{
BatchTimeoutVal: longTimeout,
},
}
defer close(mockSupport.BlockCutterVal.Block)

bareMinimumChain := &chainImpl{
parentConsumer: mockParentConsumer,
channelConsumer: mockChannelConsumer,

channel: mockChannel,
support: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
}

var counts []uint64
done := make(chan struct{})

go func() {
counts, err = bareMinimumChain.processMessagesToBlocks()
done <- struct{}{}
}()

// This is the config wrappedMessage that the for-loop will process.
mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage")))))

logger.Debug("Closing haltChan to exit the infinite for-loop")
// We are guaranteed to hit the haltChan branch after hitting the REGULAR branch at least once
close(haltChan) // Identical to chain.Halt()
logger.Debug("haltChan closed")
<-done

assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors")
assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled")
assert.Equal(t, uint64(1), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed")
assert.Equal(t, lastCutBlockNumber, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber not to be incremented")
})

t.Run("ReceiveConfigEnvelopeAndCut", func(t *testing.T) {
subtestIndex++

errorChan := make(chan struct{})
close(errorChan)
haltChan := make(chan struct{})

lastCutBlockNumber := uint64(3)

mockSupport := &mockmultichannel.ConsenterSupport{
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
SharedConfigVal: &mockconfig.Orderer{
BatchTimeoutVal: longTimeout,
},
}
defer close(mockSupport.BlockCutterVal.Block)

bareMinimumChain := &chainImpl{
parentConsumer: mockParentConsumer,
channelConsumer: mockChannelConsumer,

channel: mockChannel,
support: mockSupport,
lastCutBlockNumber: lastCutBlockNumber,

errorChan: errorChan,
haltChan: haltChan,
}

var counts []uint64
done := make(chan struct{})

go func() {
counts, err = bareMinimumChain.processMessagesToBlocks()
done <- struct{}{}
}()

// This is the normal wrappedMessage that the for-loop will process
mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage")))))
mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return
logger.Debugf("Mock blockcutter's Ordered call has returned")

// This is the config wrappedMessage that the for-loop will process.
mockSupport.ClassifyMsgVal = msgprocessor.ConfigUpdateMsg
mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage")))))

var normalBlk, configBlk *cb.Block
select {
case normalBlk = <-mockSupport.Blocks: // Let the `mockConsenterSupport.WriteBlock` proceed
case <-time.After(shortTimeout):
logger.Fatalf("Did not receive a block from the blockcutter as expected")
}

select {
case configBlk = <-mockSupport.Blocks:
case <-time.After(shortTimeout):
logger.Fatalf("Did not receive a block from the blockcutter as expected")
}

logger.Debug("Closing haltChan to exit the infinite for-loop")
// We are guaranteed to hit the haltChan branch after hitting the REGULAR branch at least once
close(haltChan) // Identical to chain.Halt()
logger.Debug("haltChan closed")
<-done

expectedOffset := newestOffset + int64(subtestIndex) // TODO Hacky, revise eventually

assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors")
assert.Equal(t, uint64(2), counts[indexRecvPass], "Expected 1 message received and unmarshaled")
assert.Equal(t, uint64(2), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed")
assert.Equal(t, lastCutBlockNumber+2, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to be incremented by 2")
assert.Equal(t, expectedOffset+1, extractEncodedOffset(normalBlk.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", expectedOffset+1)
assert.Equal(t, expectedOffset+2, extractEncodedOffset(configBlk.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in second block to be %d", expectedOffset+2)
})
}

0 comments on commit 3c663df

Please sign in to comment.