Skip to content

Commit

Permalink
Merge "Complete chaincode execution on stream termination" into relea…
Browse files Browse the repository at this point in the history
…se-1.4
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Nov 9, 2019
2 parents 872fffe + c77496c commit 0571934
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 7 deletions.
25 changes: 20 additions & 5 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ type Handler struct {
UUIDGenerator UUIDGenerator
// AppConfig is used to retrieve the application config for a channel
AppConfig ApplicationConfigRetriever
// Metrics holds chaincode handler metrics
Metrics *HandlerMetrics

// state holds the current handler state. It will be created, established, or
// ready.
Expand All @@ -166,8 +168,10 @@ type Handler struct {
chatStream ccintf.ChaincodeStream
// errChan is used to communicate errors from the async send to the receive loop
errChan chan error
// Metrics holds chaincode handler metrics
Metrics *HandlerMetrics
// mutex is used to serialze the stream closed chan.
mutex sync.Mutex
// streamDoneChan is closed when the chaincode stream terminates.
streamDoneChan chan struct{}
}

// handleMessage is called by ProcessStream to dispatch messages.
Expand Down Expand Up @@ -389,9 +393,20 @@ func (h *Handler) deregister() {
}
}

func (h *Handler) streamDone() <-chan struct{} {
h.mutex.Lock()
defer h.mutex.Unlock()
return h.streamDoneChan
}

func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error {
defer h.deregister()

h.mutex.Lock()
h.streamDoneChan = make(chan struct{})
h.mutex.Unlock()
defer close(h.streamDoneChan)

h.chatStream = stream
h.errChan = make(chan error, 1)

Expand Down Expand Up @@ -1249,9 +1264,9 @@ func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovi
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
ccName := cccid.Name + ":" + cccid.Version
h.Metrics.ExecuteTimeouts.With(
"chaincode", ccName,
).Add(1)
h.Metrics.ExecuteTimeouts.With("chaincode", ccName).Add(1)
case <-h.streamDone():
err = errors.New("chaincode stream terminated")
}

return ccresp, err
Expand Down
8 changes: 8 additions & 0 deletions core/chaincode/handler_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@ func SetHandlerChatStream(h *Handler, chatStream ccintf.ChaincodeStream) {
func SetHandlerCCInstance(h *Handler, ccInstance *sysccprovider.ChaincodeInstance) {
h.ccInstance = ccInstance
}

func StreamDone(h *Handler) <-chan struct{} {
return h.streamDone()
}

func SetStreamDoneChan(h *Handler, ch chan struct{}) {
h.streamDoneChan = ch
}
36 changes: 34 additions & 2 deletions core/chaincode/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2583,6 +2583,23 @@ var _ = Describe("Handler", func() {
})
})

Context("when the chaincode stream terminates", func() {
It("returns an error", func() {
streamDoneChan := make(chan struct{})
chaincode.SetStreamDoneChan(handler, streamDoneChan)

errCh := make(chan error, 1)
go func() {
_, err := handler.Execute(txParams, cccid, incomingMessage, time.Hour)
errCh <- err
}()
Consistently(errCh).ShouldNot(Receive())

close(streamDoneChan)
Eventually(errCh).Should(Receive(MatchError("chaincode stream terminated")))
})
})

Context("when execute times out", func() {
It("returns an error", func() {
errCh := make(chan error, 1)
Expand Down Expand Up @@ -2769,6 +2786,22 @@ var _ = Describe("Handler", func() {
Eventually(fakeChatStream.RecvCallCount).Should(Equal(100))
})

It("manages the stream done channel", func() {
releaseChan := make(chan struct{})
fakeChatStream.RecvStub = func() (*pb.ChaincodeMessage, error) {
<-releaseChan
return nil, errors.New("cc-went-away")
}
go handler.ProcessStream(fakeChatStream)
Eventually(fakeChatStream.RecvCallCount).Should(Equal(1))

streamDoneChan := chaincode.StreamDone(handler)
Consistently(streamDoneChan).ShouldNot(Receive())

close(releaseChan)
Eventually(streamDoneChan).Should(BeClosed())
})

Context("when receive fails with an io.EOF", func() {
BeforeEach(func() {
fakeChatStream.RecvReturns(nil, io.EOF)
Expand Down Expand Up @@ -2871,8 +2904,7 @@ var _ = Describe("Handler", func() {
var (
cccid *ccprovider.CCContext
incomingMessage *pb.ChaincodeMessage

recvChan chan *pb.ChaincodeMessage
recvChan chan *pb.ChaincodeMessage
)

BeforeEach(func() {
Expand Down

0 comments on commit 0571934

Please sign in to comment.