Skip to content

Commit

Permalink
Merge pull request #474 from SmartBFT-Go/request_pool
Browse files Browse the repository at this point in the history
Add configuration to control request pool parameters
  • Loading branch information
muzykantov authored Nov 30, 2021
2 parents fe83545 + cd1c0b5 commit 9172384
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 1 deletion.
19 changes: 18 additions & 1 deletion internal/bft/requestpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (

const (
defaultRequestTimeout = 10 * time.Second // for unit tests only
defaultMaxBytes = 100 * 1024 // default max request size would be of size 100Kb
)

var (
ErrReqAlreadyExists = fmt.Errorf("request already exists")
ErrRequestTooBig = fmt.Errorf("submitted request is too big")
ErrSubmitTimeout = fmt.Errorf("timeout submitting to request pool")
)

//go:generate mockery -dir . -name RequestTimeoutHandler -case underscore -output ./mocks/
Expand Down Expand Up @@ -73,6 +76,8 @@ type PoolOptions struct {
ForwardTimeout time.Duration
ComplainTimeout time.Duration
AutoRemoveTimeout time.Duration
RequestMaxBytes uint64
SubmitTimeout time.Duration
}

// NewPool constructs new requests pool
Expand All @@ -86,6 +91,12 @@ func NewPool(log api.Logger, inspector api.RequestInspector, th RequestTimeoutHa
if options.AutoRemoveTimeout == 0 {
options.AutoRemoveTimeout = defaultRequestTimeout
}
if options.RequestMaxBytes == 0 {
options.RequestMaxBytes = defaultMaxBytes
}
if options.SubmitTimeout == 0 {
options.SubmitTimeout = defaultRequestTimeout
}

return &Pool{
timeoutHandler: th,
Expand Down Expand Up @@ -142,6 +153,10 @@ func (rp *Pool) Submit(request []byte) error {
return errors.Errorf("pool closed, request rejected: %s", reqInfo)
}

if uint64(len(request)) > rp.options.RequestMaxBytes {
return ErrRequestTooBig
}

rp.lock.RLock()
_, alreadyExists := rp.existMap[reqInfo]
rp.lock.RUnlock()
Expand All @@ -151,8 +166,10 @@ func (rp *Pool) Submit(request []byte) error {
return ErrReqAlreadyExists
}

ctx, cancel := context.WithTimeout(context.Background(), rp.options.SubmitTimeout)
defer cancel()
// do not wait for a semaphore with a lock, as it will prevent draining the pool.
if err := rp.semaphore.Acquire(context.Background(), 1); err != nil {
if err := rp.semaphore.Acquire(ctx, 1); err != nil {
return errors.Wrapf(err, "acquiring semaphore for request: %s", reqInfo)
}

Expand Down
28 changes: 28 additions & 0 deletions internal/bft/requestpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package bft_test

import (
"bytes"
"crypto/rand"
"encoding/binary"
"fmt"
"sync"
Expand Down Expand Up @@ -291,6 +292,33 @@ func TestReqPoolTimeout(t *testing.T) {
insp := &testRequestInspector{}
submittedChan := make(chan struct{}, 1)

t.Run("request size too big", func(t *testing.T) {
timeoutHandler := &mocks.RequestTimeoutHandler{}

timeoutHandler.On("OnRequestTimeout", byteReq1, insp.RequestID(byteReq1)).Return()
timeoutHandler.On("OnLeaderFwdRequestTimeout", byteReq1, insp.RequestID(byteReq1)).Return()
timeoutHandler.On("OnAutoRemoveTimeout", insp.RequestID(byteReq1)).Return()

pool := bft.NewPool(log, insp, timeoutHandler,
bft.PoolOptions{
QueueSize: 3,
ForwardTimeout: 10 * time.Millisecond,
ComplainTimeout: time.Hour,
AutoRemoveTimeout: time.Hour,
RequestMaxBytes: 1024,
},
nil,
)
defer pool.Close()

payload := make([]byte, 2048)
rand.Read(payload)
request := makeTestRequest("1", "1", string(payload))
assert.Equal(t, 0, pool.Size())
err = pool.Submit(request)
assert.Equal(t, err, bft.ErrRequestTooBig)

})
t.Run("request timeout", func(t *testing.T) {
timeoutHandler := &mocks.RequestTimeoutHandler{}

Expand Down
4 changes: 4 additions & 0 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func (c *Consensus) Start() error {
ForwardTimeout: c.Config.RequestForwardTimeout,
ComplainTimeout: c.Config.RequestComplainTimeout,
AutoRemoveTimeout: c.Config.RequestAutoRemoveTimeout,
RequestMaxBytes: c.Config.RequestMaxBytes,
SubmitTimeout: c.Config.RequestPoolSubmitTimeout,
}
c.submittedChan = make(chan struct{}, 1)
c.Pool = algorithm.NewPool(c.Logger, c.RequestInspector, c.controller, opts, c.submittedChan)
Expand Down Expand Up @@ -214,6 +216,8 @@ func (c *Consensus) reconfig(reconfig types.Reconfig) {
ForwardTimeout: c.Config.RequestForwardTimeout,
ComplainTimeout: c.Config.RequestComplainTimeout,
AutoRemoveTimeout: c.Config.RequestAutoRemoveTimeout,
RequestMaxBytes: c.Config.RequestMaxBytes,
SubmitTimeout: c.Config.RequestPoolSubmitTimeout,
}
c.Pool.ChangeTimeouts(c.controller, opts) // TODO handle reconfiguration of queue size in the pool
c.continueCreateComponents()
Expand Down
17 changes: 17 additions & 0 deletions pkg/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ type Configuration struct {
LeaderRotation bool
// DecisionsPerLeader is the number of decisions reached by a leader before there is a leader rotation.
DecisionsPerLeader uint64

// RequestMaxBytes total allowed size of the single request
RequestMaxBytes uint64

// RequestPoolSubmitTimeout the total amount of time client can wait for submission of single
// request into request pool
RequestPoolSubmitTimeout time.Duration
}

// DefaultConfig contains reasonable values for a small cluster that resides on the same geography (or "Region"), but
Expand All @@ -102,6 +109,8 @@ var DefaultConfig = Configuration{
SpeedUpViewChange: false,
LeaderRotation: true,
DecisionsPerLeader: 3,
RequestMaxBytes: 10 * 1024,
RequestPoolSubmitTimeout: 5 * time.Second,
}

func (c Configuration) Validate() error {
Expand Down Expand Up @@ -167,5 +176,13 @@ func (c Configuration) Validate() error {
return errors.Errorf("DecisionsPerLeader should be greater than zero when leader rotation is active")
}

if !(c.RequestMaxBytes > 0) {
return errors.Errorf("RequestMaxBytes should be greater than zero")
}

if !(c.RequestPoolSubmitTimeout > 0) {
return errors.Errorf("RequestPoolSubmitTimeout should be greater than zero")
}

return nil
}
6 changes: 6 additions & 0 deletions test/reconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Configuration struct {
SpeedUpViewChange bool
LeaderRotation bool
DecisionsPerLeader int64
RequestMaxBytes int64
RequestPoolSubmitTimeout time.Duration
}

type Reconfig struct {
Expand Down Expand Up @@ -58,6 +60,8 @@ func (r Reconfig) recconfigToUint(id uint64) types.Reconfig {
SpeedUpViewChange: r.CurrentConfig.SpeedUpViewChange,
LeaderRotation: r.CurrentConfig.LeaderRotation,
DecisionsPerLeader: uint64(r.CurrentConfig.DecisionsPerLeader),
RequestMaxBytes: uint64(r.CurrentConfig.RequestBatchMaxBytes),
RequestPoolSubmitTimeout: r.CurrentConfig.RequestPoolSubmitTimeout,
},
}
}
Expand Down Expand Up @@ -85,6 +89,8 @@ func recconfigToInt(reconfig types.Reconfig) Reconfig {
SpeedUpViewChange: reconfig.CurrentConfig.SpeedUpViewChange,
LeaderRotation: reconfig.CurrentConfig.LeaderRotation,
DecisionsPerLeader: int64(reconfig.CurrentConfig.DecisionsPerLeader),
RequestMaxBytes: int64(reconfig.CurrentConfig.RequestBatchMaxBytes),
RequestPoolSubmitTimeout: reconfig.CurrentConfig.RequestPoolSubmitTimeout,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions test/test_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var fastConfig = types.Configuration{
NumOfTicksBehindBeforeSyncing: 10,
CollectTimeout: 200 * time.Millisecond,
LeaderRotation: false,
RequestMaxBytes: 10 * 1024,
RequestPoolSubmitTimeout: 5 * time.Second,
}

// App implements all interfaces required by an application using this library
Expand Down

0 comments on commit 9172384

Please sign in to comment.