diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index b7e418197cf..cb864803d4c 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -149,6 +149,8 @@ type Handler struct { UUIDGenerator UUIDGenerator // AppConfig is used to retrieve the application config for a channel AppConfig ApplicationConfigRetriever + // Metrics holds chaincode handler metrics + Metrics *HandlerMetrics // state holds the current handler state. It will be created, established, or // ready. @@ -166,8 +168,10 @@ type Handler struct { chatStream ccintf.ChaincodeStream // errChan is used to communicate errors from the async send to the receive loop errChan chan error - // Metrics holds chaincode handler metrics - Metrics *HandlerMetrics + // mutex is used to serialze the stream closed chan. + mutex sync.Mutex + // streamDoneChan is closed when the chaincode stream terminates. + streamDoneChan chan struct{} } // handleMessage is called by ProcessStream to dispatch messages. @@ -389,9 +393,20 @@ func (h *Handler) deregister() { } } +func (h *Handler) streamDone() <-chan struct{} { + h.mutex.Lock() + defer h.mutex.Unlock() + return h.streamDoneChan +} + func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error { defer h.deregister() + h.mutex.Lock() + h.streamDoneChan = make(chan struct{}) + h.mutex.Unlock() + defer close(h.streamDoneChan) + h.chatStream = stream h.errChan = make(chan error, 1) @@ -1249,9 +1264,9 @@ func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovi case <-time.After(timeout): err = errors.New("timeout expired while executing transaction") ccName := cccid.Name + ":" + cccid.Version - h.Metrics.ExecuteTimeouts.With( - "chaincode", ccName, - ).Add(1) + h.Metrics.ExecuteTimeouts.With("chaincode", ccName).Add(1) + case <-h.streamDone(): + err = errors.New("chaincode stream terminated") } return ccresp, err diff --git a/core/chaincode/handler_internal_test.go b/core/chaincode/handler_internal_test.go index 07b556fff65..78b6a0222f4 100644 --- a/core/chaincode/handler_internal_test.go +++ b/core/chaincode/handler_internal_test.go @@ -25,3 +25,11 @@ func SetHandlerChatStream(h *Handler, chatStream ccintf.ChaincodeStream) { func SetHandlerCCInstance(h *Handler, ccInstance *sysccprovider.ChaincodeInstance) { h.ccInstance = ccInstance } + +func StreamDone(h *Handler) <-chan struct{} { + return h.streamDone() +} + +func SetStreamDoneChan(h *Handler, ch chan struct{}) { + h.streamDoneChan = ch +} diff --git a/core/chaincode/handler_test.go b/core/chaincode/handler_test.go index c224b8642dd..076597cc016 100644 --- a/core/chaincode/handler_test.go +++ b/core/chaincode/handler_test.go @@ -2583,6 +2583,23 @@ var _ = Describe("Handler", func() { }) }) + Context("when the chaincode stream terminates", func() { + It("returns an error", func() { + streamDoneChan := make(chan struct{}) + chaincode.SetStreamDoneChan(handler, streamDoneChan) + + errCh := make(chan error, 1) + go func() { + _, err := handler.Execute(txParams, cccid, incomingMessage, time.Hour) + errCh <- err + }() + Consistently(errCh).ShouldNot(Receive()) + + close(streamDoneChan) + Eventually(errCh).Should(Receive(MatchError("chaincode stream terminated"))) + }) + }) + Context("when execute times out", func() { It("returns an error", func() { errCh := make(chan error, 1) @@ -2769,6 +2786,22 @@ var _ = Describe("Handler", func() { Eventually(fakeChatStream.RecvCallCount).Should(Equal(100)) }) + It("manages the stream done channel", func() { + releaseChan := make(chan struct{}) + fakeChatStream.RecvStub = func() (*pb.ChaincodeMessage, error) { + <-releaseChan + return nil, errors.New("cc-went-away") + } + go handler.ProcessStream(fakeChatStream) + Eventually(fakeChatStream.RecvCallCount).Should(Equal(1)) + + streamDoneChan := chaincode.StreamDone(handler) + Consistently(streamDoneChan).ShouldNot(Receive()) + + close(releaseChan) + Eventually(streamDoneChan).Should(BeClosed()) + }) + Context("when receive fails with an io.EOF", func() { BeforeEach(func() { fakeChatStream.RecvReturns(nil, io.EOF) @@ -2871,8 +2904,7 @@ var _ = Describe("Handler", func() { var ( cccid *ccprovider.CCContext incomingMessage *pb.ChaincodeMessage - - recvChan chan *pb.ChaincodeMessage + recvChan chan *pb.ChaincodeMessage ) BeforeEach(func() {