Skip to content

Commit

Permalink
Merge pull request hyperledger-archives#598 from silasdavis/fix-sub-t…
Browse files Browse the repository at this point in the history
…est-race

Fix broken unsubscribe…
  • Loading branch information
benjaminbollen authored May 4, 2017
2 parents e0225b3 + 3cc9f9a commit 7454c0a
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 30 deletions.
13 changes: 8 additions & 5 deletions client/websocket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/tendermint/go-wire"

"github.com/hyperledger/burrow/logging"
tendermint_client "github.com/hyperledger/burrow/rpc/tendermint/client"
ctypes "github.com/hyperledger/burrow/rpc/tendermint/core/types"
"github.com/hyperledger/burrow/txs"
)
Expand All @@ -50,14 +51,16 @@ type burrowNodeWebsocketClient struct {
}

// Subscribe to an eventid
func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) Subscribe(eventid string) error {
func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) Subscribe(eventId string) error {
// TODO we can in the background listen to the subscription id and remember it to ease unsubscribing later.
return burrowNodeWebsocketClient.tendermintWebsocket.Subscribe(eventid)
return tendermint_client.Subscribe(burrowNodeWebsocketClient.tendermintWebsocket,
eventId)
}

// Unsubscribe from an eventid
func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) Unsubscribe(subscriptionId string) error {
return burrowNodeWebsocketClient.tendermintWebsocket.Unsubscribe(subscriptionId)
return tendermint_client.Unsubscribe(burrowNodeWebsocketClient.tendermintWebsocket,
subscriptionId)
}

// Returns a channel that will receive a confirmation with a result or the exception that
Expand All @@ -73,10 +76,10 @@ func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) WaitForConfirmation(
var latestBlockHash []byte

eid := txs.EventStringAccInput(inputAddr)
if err := burrowNodeWebsocketClient.tendermintWebsocket.Subscribe(eid); err != nil {
if err := burrowNodeWebsocketClient.Subscribe(eid); err != nil {
return nil, fmt.Errorf("Error subscribing to AccInput event (%s): %v", eid, err)
}
if err := burrowNodeWebsocketClient.tendermintWebsocket.Subscribe(txs.EventStringNewBlock()); err != nil {
if err := burrowNodeWebsocketClient.Subscribe(txs.EventStringNewBlock()); err != nil {
return nil, fmt.Errorf("Error subscribing to NewBlock event: %v", err)
}
// Read the incoming events
Expand Down
2 changes: 1 addition & 1 deletion definitions/tendermint_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type TendermintPipe interface {
// Subscribe attempts to subscribe the listener identified by listenerId to
// the event named event. The Event result is written to rpcResponseWriter
// which must be non-blocking
Subscribe(event string,
Subscribe(eventId string,
rpcResponseWriter func(result rpc_tm_types.BurrowResult)) (*rpc_tm_types.ResultSubscribe, error)
Unsubscribe(subscriptionId string) (*rpc_tm_types.ResultUnsubscribe, error)

Expand Down
14 changes: 8 additions & 6 deletions manager/burrow-mint/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,24 +246,26 @@ func (pipe *burrowMintPipe) consensusAndManagerEvents() edb_event.EventEmitter {

//------------------------------------------------------------------------------
// Implement definitions.TendermintPipe for burrowMintPipe
func (pipe *burrowMintPipe) Subscribe(event string,
func (pipe *burrowMintPipe) Subscribe(eventId string,
rpcResponseWriter func(result rpc_tm_types.BurrowResult)) (*rpc_tm_types.ResultSubscribe, error) {
subscriptionId, err := edb_event.GenerateSubId()
if err != nil {
return nil, err
logging.InfoMsg(pipe.logger, "Subscribing to event",
"event", event, "subscriptionId", subscriptionId)
"eventId", eventId, "subscriptionId", subscriptionId)
}
pipe.consensusAndManagerEvents().Subscribe(subscriptionId, event,
pipe.consensusAndManagerEvents().Subscribe(subscriptionId, eventId,
func(eventData txs.EventData) {
result := rpc_tm_types.BurrowResult(&rpc_tm_types.ResultEvent{event,
txs.EventData(eventData)})
result := rpc_tm_types.BurrowResult(
&rpc_tm_types.ResultEvent{
Event: eventId,
Data: txs.EventData(eventData)})
// NOTE: EventSwitch callbacks must be nonblocking
rpcResponseWriter(result)
})
return &rpc_tm_types.ResultSubscribe{
SubscriptionId: subscriptionId,
Event: event,
Event: eventId,
}, nil
}

Expand Down
39 changes: 39 additions & 0 deletions rpc/tendermint/client/websocket_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2017 Monax Industries Limited
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import "github.com/tendermint/go-rpc/types"

type WebsocketClient interface {
WriteJSON(v interface{}) error
}

func Subscribe(websocketClient WebsocketClient, eventId string) error {
return websocketClient.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "subscribe",
Params: map[string]interface{}{"eventId": eventId},
})
}

func Unsubscribe(websocketClient WebsocketClient, subscriptionId string) error {
return websocketClient.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "unsubscribe",
Params: map[string]interface{}{"subscriptionId": subscriptionId},
})
}
6 changes: 3 additions & 3 deletions rpc/tendermint/core/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type TendermintRoutes struct {

func (tmRoutes *TendermintRoutes) GetRoutes() map[string]*rpc.RPCFunc {
var routes = map[string]*rpc.RPCFunc{
"subscribe": rpc.NewWSRPCFunc(tmRoutes.Subscribe, "event"),
"subscribe": rpc.NewWSRPCFunc(tmRoutes.Subscribe, "eventId"),
"unsubscribe": rpc.NewWSRPCFunc(tmRoutes.Unsubscribe, "subscriptionId"),
"status": rpc.NewRPCFunc(tmRoutes.StatusResult, ""),
"net_info": rpc.NewRPCFunc(tmRoutes.NetInfoResult, ""),
Expand Down Expand Up @@ -66,15 +66,15 @@ func (tmRoutes *TendermintRoutes) GetRoutes() map[string]*rpc.RPCFunc {
}

func (tmRoutes *TendermintRoutes) Subscribe(wsCtx rpctypes.WSRPCContext,
event string) (ctypes.BurrowResult, error) {
eventId string) (ctypes.BurrowResult, error) {
// NOTE: RPCResponses of subscribed events have id suffix "#event"
// TODO: we really ought to allow multiple subscriptions from the same client address
// to the same event. The code as it stands reflects the somewhat broken tendermint
// implementation. We can use GenerateSubId to randomize the subscriptions id
// and return it in the result. This would require clients to hang on to a
// subscription id if they wish to unsubscribe, but then again they can just
// drop their connection
result, err := tmRoutes.tendermintPipe.Subscribe(event,
result, err := tmRoutes.tendermintPipe.Subscribe(eventId,
func(result ctypes.BurrowResult) {
wsCtx.GetRemoteAddr()
// NOTE: EventSwitch callbacks must be nonblocking
Expand Down
24 changes: 13 additions & 11 deletions rpc/tendermint/test/websocket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,12 @@ func TestSubscribe(t *testing.T) {
var subId string
subscribe(t, wsc, txs.EventStringNewBlock())

timeout := time.NewTimer(timeoutSeconds * time.Second)
// timeout to check subscription process is live
timeout := time.After(timeoutSeconds * time.Second)
Subscribe:
for {
select {
case <-timeout.C:
case <-timeout:
t.Fatal("Timed out waiting for subscription result")

case bs := <-wsc.ResultsCh:
Expand All @@ -277,12 +278,13 @@ Subscribe:
}
}

seenBlock := false
timeout = time.NewTimer(timeoutSeconds * time.Second)
blocksSeen := 0
for {
select {
case <-timeout.C:
if !seenBlock {
// wait long enough to check we don't see another new block event even though
// a block will have come
case <-time.After(expectBlockInSeconds * time.Second):
if blocksSeen == 0 {
t.Fatal("Timed out without seeing a NewBlock event")
}
return
Expand All @@ -292,13 +294,13 @@ Subscribe:
if ok {
_, ok := resultEvent.Data.(txs.EventDataNewBlock)
if ok {
if seenBlock {
// There's a mild race here, but when we enter we've just seen a block
// so we should be able to unsubscribe before we see another block
if blocksSeen > 1 {
t.Fatal("Continued to see NewBlock event after unsubscribing")
} else {
seenBlock = true
unsubscribe(t, wsc, subId)
if blocksSeen == 0 {
unsubscribe(t, wsc, subId)
}
blocksSeen++
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions rpc/tendermint/test/websocket_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (
)

const (
timeoutSeconds = 2
timeoutSeconds = 2
expectBlockInSeconds = timeoutSeconds * 2
)

//--------------------------------------------------------------------------------
Expand All @@ -48,14 +49,14 @@ func newWSClient() *rpcclient.WSClient {

// subscribe to an event
func subscribe(t *testing.T, wsc *rpcclient.WSClient, eventId string) {
if err := wsc.Subscribe(eventId); err != nil {
if err := burrow_client.Subscribe(wsc, eventId); err != nil {
t.Fatal(err)
}
}

func subscribeAndGetSubscriptionId(t *testing.T, wsc *rpcclient.WSClient,
eventId string) string {
if err := wsc.Subscribe(eventId); err != nil {
if err := burrow_client.Subscribe(wsc, eventId); err != nil {
t.Fatal(err)
}

Expand All @@ -75,7 +76,7 @@ func subscribeAndGetSubscriptionId(t *testing.T, wsc *rpcclient.WSClient,

// unsubscribe from an event
func unsubscribe(t *testing.T, wsc *rpcclient.WSClient, subscriptionId string) {
if err := wsc.Unsubscribe(subscriptionId); err != nil {
if err := burrow_client.Unsubscribe(wsc, subscriptionId); err != nil {
t.Fatal(err)
}
}
Expand Down

0 comments on commit 7454c0a

Please sign in to comment.