Skip to content

Commit

Permalink
[FIXED] Don't timeout for retried AckAll (#6392)
Browse files Browse the repository at this point in the history
During server restarts (for example) a response to an `AckSync` could be
lost which leads into a timeout. If the client would then retry the
`AckSync` it should eventually come back successful.

This was true for `AckExplicit`, but for `AckAll` the retried `AckSync`
would always timeout. In this case the server would not respond to the
client at all, making it impossible for the client to know if the ack
for that message went through.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Jan 21, 2025
2 parents 7c25ec0 + 2d05487 commit 04c8dcb
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
3 changes: 2 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3134,7 +3134,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
// no-op
if dseq <= o.adflr || sseq <= o.asflr {
o.mu.Unlock()
return ackInPlace
// Return true to let caller respond back to the client.
return true
}
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
Expand Down
44 changes: 44 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2534,3 +2534,47 @@ func TestJetStreamConsumerBackoffWhenBackoffLengthIsEqualToMaxDeliverConfig(t *t
require_NoError(t, err)
require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3))
}

func TestJetStreamConsumerRetryAckAfterTimeout(t *testing.T) {
for _, ack := range []struct {
title string
policy nats.SubOpt
}{
{title: "AckExplicit", policy: nats.AckExplicit()},
{title: "AckAll", policy: nats.AckAll()},
} {
t.Run(ack.title, func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)

sub, err := js.PullSubscribe("foo", "CONSUMER", ack.policy)
require_NoError(t, err)

msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)

msg := msgs[0]
// Send core request so the client is unaware of the ack being sent.
_, err = nc.Request(msg.Reply, nil, time.Second)
require_NoError(t, err)

// It could be we have already acked this specific message, but we haven't received the success response.
// Retrying the ack should not time out and still signal success.
err = msg.AckSync()
require_NoError(t, err)
})
}
}

0 comments on commit 04c8dcb

Please sign in to comment.