Skip to content

Commit

Permalink
[FAB-4768] don't kill deliver clients on first err
Browse files Browse the repository at this point in the history
Deliver clients no longer fail at the first sign of
trouble with the orderer/kafka cluster link.

- Custom sarama logger enhanced to allow listening to logging
  'events'.

Change-Id: I4a30e86ff77fd3555e1b67cf79fd44a04ab352f9
Signed-off-by: Luis Sanchez <[email protected]>
  • Loading branch information
Luis Sanchez committed Sep 26, 2017
1 parent 7c12814 commit d151ef8
Show file tree
Hide file tree
Showing 9 changed files with 701 additions and 21 deletions.
2 changes: 1 addition & 1 deletion orderer/common/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func initializeMultichannelRegistrar(conf *config.TopLevel, signer crypto.LocalS

consenters := make(map[string]consensus.Consenter)
consenters["solo"] = solo.New()
consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version, conf.Kafka.Verbose)
consenters["kafka"] = kafka.New(conf.Kafka)

return multichannel.NewRegistrar(lf, consenters, signer)
}
2 changes: 1 addition & 1 deletion orderer/common/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (mbs *mockBroadcastSrv) Recv() (*cb.Envelope, error) {
return mbs.msg, mbs.err
}

func (mbs *mockBroadcastSrv) Send(br *ab.BroadcastResponse) error {
func (mb *mockBroadcastSrv) Send(br *ab.BroadcastResponse) error {
panic("Unimplimented")
}

Expand Down
78 changes: 70 additions & 8 deletions orderer/consensus/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
}
}()

subscription := fmt.Sprintf("added subscription to %s/%d", chain.channel.topic(), chain.channel.partition())
var topicPartitionSubscriptionResumed <-chan string
var deliverSessionTimer *time.Timer
var deliverSessionTimedOut <-chan time.Time

for {
select {
case <-chain.haltChan:
Expand All @@ -241,22 +246,79 @@ func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
select {
case <-chain.errorChan: // If already closed, don't do anything
default:
close(chain.errorChan)

switch kafkaErr.Err {
case sarama.ErrOffsetOutOfRange:
// the kafka consumer will auto retry for all errors except for ErrOffsetOutOfRange
logger.Errorf("[channel: %s] Unrecoverable error during consumption: %s", chain.ChainID(), kafkaErr)
close(chain.errorChan)
default:
if topicPartitionSubscriptionResumed == nil {
// register listener
topicPartitionSubscriptionResumed = saramaLogger.NewListener(subscription)
// start session timout timer
deliverSessionTimer = time.NewTimer(chain.consenter.retryOptions().NetworkTimeouts.ReadTimeout)
deliverSessionTimedOut = deliverSessionTimer.C
}
}
}
select {
case <-chain.errorChan: // we are not ignoring the error
logger.Warningf("[channel: %s] Closed the errorChan", chain.ChainID())
// This covers the edge case where (1) a consumption error has
// closed the errorChan and thus rendered the chain unavailable to
// deliver clients, (2) we're already at the newest offset, and (3)
// there are no new Broadcast requests coming in. In this case,
// there is no trigger that can recreate the errorChan again and
// mark the chain as available, so we have to force that trigger via
// the emission of a CONNECT message. TODO Consider rate limiting
go sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel)
default: // we are ignoring the error
logger.Warningf("[channel: %s] Deliver sessions will be dropped if consumption errors continue.", chain.ChainID())
}
case <-topicPartitionSubscriptionResumed:
// stop listening for subscription message
saramaLogger.RemoveListener(subscription, topicPartitionSubscriptionResumed)
// disable subscription event chan
topicPartitionSubscriptionResumed = nil

// stop timeout timer
if !deliverSessionTimer.Stop() {
<-deliverSessionTimer.C
}
logger.Warningf("[channel: %s] Consumption will resume.", chain.ChainID())

case <-deliverSessionTimedOut:
// stop listening for subscription message
saramaLogger.RemoveListener(subscription, topicPartitionSubscriptionResumed)
// disable subscription event chan
topicPartitionSubscriptionResumed = nil

close(chain.errorChan)
logger.Warningf("[channel: %s] Closed the errorChan", chain.ChainID())
// This covers the edge case where (1) a consumption error has
// closed the errorChan and thus rendered the chain unavailable to
// deliver clients, (2) we're already at the newest offset, and (3)
// there are no new Broadcast requests coming in. In this case,
// there is no trigger that can recreate the errorChan again and
// mark the chain as available, so we have to force that trigger via
// the emission of a CONNECT message. TODO Consider rate limiting

// make chain available again via CONNECT message trigger
go sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel)

case in, ok := <-chain.channelConsumer.Messages():
if !ok {
logger.Criticalf("[channel: %s] Kafka consumer closed.", chain.ChainID())
return counts, nil
}

// catch the possibility that we missed a topic subscription event before
// we registered the event listener
if topicPartitionSubscriptionResumed != nil {
// stop listening for subscription message
saramaLogger.RemoveListener(subscription, topicPartitionSubscriptionResumed)
// disable subscription event chan
topicPartitionSubscriptionResumed = nil
// stop timeout timer
if !deliverSessionTimer.Stop() {
<-deliverSessionTimer.C
}
}

select {
case <-chain.errorChan: // If this channel was closed...
chain.errorChan = make(chan struct{}) // ...make a new one.
Expand Down
Loading

0 comments on commit d151ef8

Please sign in to comment.