From e19cb88a679a5320f0ba9a2ad4d7c2bdb9336fc1 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Mon, 18 Jul 2022 09:22:36 -0700 Subject: [PATCH 1/5] Increase the allowed concurrent gRPC streams --- vault/cluster/cluster.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vault/cluster/cluster.go b/vault/cluster/cluster.go index a20245147a8e..3219a95570b1 100644 --- a/vault/cluster/cluster.go +++ b/vault/cluster/cluster.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "errors" "fmt" + "math" "net" "net/url" "os" @@ -81,6 +82,10 @@ func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Lo // Our forwarding connections heartbeat regularly so anything else we // want to go away/get cleaned up pretty rapidly IdleTimeout: idleTimeout, + + // By default this is 250 which can be too small on high traffic + // clusters with many forwarded or replication gRPC connections. + MaxConcurrentStreams: math.MaxUint32, } return &Listener{ From 3f069e55dd5e91ec03be530e5fa5803a8dba48c7 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Wed, 20 Jul 2022 10:59:10 -0700 Subject: [PATCH 2/5] Add a env override for the max streams setting --- vault/cluster/cluster.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/vault/cluster/cluster.go b/vault/cluster/cluster.go index 3219a95570b1..b0aca198b282 100644 --- a/vault/cluster/cluster.go +++ b/vault/cluster/cluster.go @@ -10,6 +10,7 @@ import ( "net" "net/url" "os" + "strconv" "sync" "sync/atomic" "time" @@ -74,6 +75,20 @@ type Listener struct { } func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout time.Duration) *Listener { + + maxStreams := math.MaxUint32 + if override := os.Getenv("VAULT_GRPC_MAX_STREAMS"); override != "" { + i, err := strconv.Atoi(override) + if err != nil { + logger.Warn("vault grpc max streams override must be an integer", "value", override) + } else if i < 0 || i > math.MaxUint32 { + logger.Warn("vault grpc max streams override out of range", "value", i) + } else { + maxStreams = i + logger.Info("overriding grpc max streams", "value", i) + } + } + // Create the HTTP/2 server that will be shared by both RPC and regular // duties. Doing it this way instead of listening via the server and gRPC // allows us to re-use the same port via ALPN. We can just tell the server @@ -85,7 +100,7 @@ func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Lo // By default this is 250 which can be too small on high traffic // clusters with many forwarded or replication gRPC connections. - MaxConcurrentStreams: math.MaxUint32, + MaxConcurrentStreams: uint32(maxStreams), } return &Listener{ From b8e48d02f77e9ee9e152c87f1cc1f29e71550c05 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Wed, 20 Jul 2022 11:32:25 -0700 Subject: [PATCH 3/5] Add changelog --- changelog/16327.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog/16327.txt diff --git a/changelog/16327.txt b/changelog/16327.txt new file mode 100644 index 000000000000..da22993c02f7 --- /dev/null +++ b/changelog/16327.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Increase the allowed concurrent gRPC streams over the cluster port. +``` From a97273a730406e195e0ba1192945eae6c742690b Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Wed, 20 Jul 2022 11:36:29 -0700 Subject: [PATCH 4/5] go fmt --- vault/cluster/cluster.go | 1 - 1 file changed, 1 deletion(-) diff --git a/vault/cluster/cluster.go b/vault/cluster/cluster.go index b0aca198b282..c699a9e595c7 100644 --- a/vault/cluster/cluster.go +++ b/vault/cluster/cluster.go @@ -75,7 +75,6 @@ type Listener struct { } func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout time.Duration) *Listener { - maxStreams := math.MaxUint32 if override := os.Getenv("VAULT_GRPC_MAX_STREAMS"); override != "" { i, err := strconv.Atoi(override) From 7facd6b14abb442b6f84e5e84701b37d54c7bd86 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Wed, 20 Jul 2022 12:12:30 -0700 Subject: [PATCH 5/5] fix builds on 32bit systems --- vault/cluster/cluster.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/vault/cluster/cluster.go b/vault/cluster/cluster.go index c699a9e595c7..fca8ca8967ea 100644 --- a/vault/cluster/cluster.go +++ b/vault/cluster/cluster.go @@ -75,15 +75,13 @@ type Listener struct { } func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout time.Duration) *Listener { - maxStreams := math.MaxUint32 + var maxStreams uint32 = math.MaxUint32 if override := os.Getenv("VAULT_GRPC_MAX_STREAMS"); override != "" { - i, err := strconv.Atoi(override) + i, err := strconv.ParseUint(override, 10, 32) if err != nil { - logger.Warn("vault grpc max streams override must be an integer", "value", override) - } else if i < 0 || i > math.MaxUint32 { - logger.Warn("vault grpc max streams override out of range", "value", i) + logger.Warn("vault grpc max streams override must be an uint32 integer", "value", override) } else { - maxStreams = i + maxStreams = uint32(i) logger.Info("overriding grpc max streams", "value", i) } } @@ -99,7 +97,7 @@ func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Lo // By default this is 250 which can be too small on high traffic // clusters with many forwarded or replication gRPC connections. - MaxConcurrentStreams: uint32(maxStreams), + MaxConcurrentStreams: maxStreams, } return &Listener{