Skip to content

Commit

Permalink
Report correct reason of stream abort in orderer cluster
Browse files Browse the repository at this point in the history
This commit fixes a bug that makes the cluster communication infrastructure
always report an "aborted" reason after a stream terminates.

The reason for the bug is that the serviceStream() method was always
overriding the real reason the stream was terminated, with the same "aborted" reason.

Moving the stream termination reason to reside inside the sync.Once block along with
the rest of the termination logic solved this bug.

Change-Id: I7060a3c5630c6d28c73f025de8b85061077638d6
Signed-off-by: Yacov Manevich <[email protected]>
(cherry picked from commit f0584c6)
  • Loading branch information
yacovm authored and denyeart committed Feb 18, 2021
1 parent 2b2e154 commit fae13c3
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 15 deletions.
28 changes: 13 additions & 15 deletions orderer/common/cluster/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,9 @@ func (stream *Stream) sendMessage(request *orderer.StepRequest) {
func (stream *Stream) serviceStream() {
streamStartTime := time.Now()
defer func() {
stream.Logger.Debugf("Stream %d to (%s) terminating at total lifetime of %s",
stream.ID, stream.Endpoint, time.Since(streamStartTime))

stream.Cancel(errAborted)
stream.Logger.Debugf("Stream %d to (%s) terminated with total lifetime of %s",
stream.ID, stream.Endpoint, time.Since(streamStartTime))
}()

for {
Expand Down Expand Up @@ -666,21 +665,20 @@ func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) {
var canceled uint32

abortChan := make(chan struct{})

abort := func() {
cancel()
rc.streamsByID.Delete(streamID)
rc.Metrics.reportEgressStreamCount(rc.Channel, atomic.LoadUint32(&rc.streamsByID.size))
rc.Logger.Debugf("Stream %d to %s(%s) is aborted", streamID, nodeName, rc.endpoint)
atomic.StoreUint32(&canceled, 1)
close(abortChan)
}
abortReason := &atomic.Value{}

once := &sync.Once{}
abortReason := &atomic.Value{}

cancelWithReason := func(err error) {
abortReason.Store(err.Error())
once.Do(abort)
once.Do(func() {
abortReason.Store(err.Error())
cancel()
rc.streamsByID.Delete(streamID)
rc.Metrics.reportEgressStreamCount(rc.Channel, atomic.LoadUint32(&rc.streamsByID.size))
rc.Logger.Debugf("Stream %d to %s(%s) is aborted", streamID, nodeName, rc.endpoint)
atomic.StoreUint32(&canceled, 1)
close(abortChan)
})
}

logger := flogging.MustGetLogger("orderer.common.cluster.step")
Expand Down
49 changes: 49 additions & 0 deletions orderer/common/cluster/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,55 @@ func TestUnavailableHosts(t *testing.T) {
assert.Contains(t, err.Error(), "connection")
}

func TestStreamAbortReportCorrectError(t *testing.T) {
// Scenario: node 1 acquires a stream to node 2 and then the stream
// encounters an error and as a result, the stream is aborted.
// We ensure the error reported is the first error, even after
// multiple attempts of using it.

node1 := newTestNode(t)
defer node1.stop()

node2 := newTestNode(t)
defer node2.stop()

node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
node2.c.Configure(testChannel, []cluster.RemoteNode{node1.nodeInfo})

node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(errors.Errorf("whoops")).Once()

rm1, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
assert.NoError(t, err)
var streamTerminated sync.WaitGroup
streamTerminated.Add(1)

stream := assertEventualEstablishStream(t, rm1)

l, err := zap.NewDevelopment()
assert.NoError(t, err)
stream.Logger = flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "Stream 1 to") && strings.Contains(entry.Message, "terminated") {
streamTerminated.Done()
}
return nil
}))

// Probe the stream for the first time
err = stream.Send(wrapSubmitReq(testReq))
assert.NoError(t, err)

// We should receive back the crafted error
_, err = stream.Recv()
assert.Contains(t, err.Error(), "whoops")

// Wait for the stream to be terminated from within the communication infrastructure
streamTerminated.Wait()

// We should still receive the original crafted error despite the stream being terminated
err = stream.Send(wrapSubmitReq(testReq))
assert.Contains(t, err.Error(), "whoops")
}

func TestStreamAbort(t *testing.T) {
// Scenarios: node 1 is connected to node 2 in 2 channels,
// and the consumer of the communication calls receive.
Expand Down

0 comments on commit fae13c3

Please sign in to comment.