Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Clear all pre-acks for seq upon removing message #6325

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3098,8 +3098,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 {
// Skip and update our lseq.
last := mset.store.SkipMsg()
mset.mu.Lock()
mset.setLastSeq(last)
mset.clearAllPreAcks(last)
mset.mu.Unlock()
continue
}

Expand Down Expand Up @@ -8805,18 +8807,18 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
return 0, err
}

mset.mu.Lock()
defer mset.mu.Unlock()
// Update our lseq.
mset.setLastSeq(seq)

// Check for MsgId and if we have one here make sure to update our internal map.
if len(hdr) > 0 {
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
if !ddloaded {
mset.mu.Lock()
mset.rebuildDedupe()
mset.mu.Unlock()
}
mset.storeMsgId(&ddentry{msgId, seq, ts})
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
}
}

Expand Down
93 changes: 93 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6999,6 +6999,99 @@ func TestJetStreamClusterStreamUpscalePeersAfterDownscale(t *testing.T) {
checkPeerSet()
}

func TestJetStreamClusterClearAllPreAcksOnRemoveMsg(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
Retention: nats.WorkQueuePolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "CONSUMER",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

for i := 0; i < 3; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}

// Wait for all servers to converge on the same state.
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})

// Register pre-acks on all servers.
// Normally this can't happen as the stream leader will have the message that's acked available, just for testing.
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("CONSUMER")
require_NotNil(t, o)

// Register pre-acks for the 3 messages.
mset.registerPreAckLock(o, 1)
mset.registerPreAckLock(o, 2)
mset.registerPreAckLock(o, 3)
}

// Check there's an expected amount of pre-acks, and there are no pre-acks for the given sequence.
checkPreAcks := func(seq uint64, expected int) {
t.Helper()
checkFor(t, 5*time.Second, time.Second, func() error {
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
mset.mu.RLock()
numPreAcks := len(mset.preAcks)
numSeqPreAcks := len(mset.preAcks[seq])
mset.mu.RUnlock()
if numPreAcks != expected {
return fmt.Errorf("expected %d pre-acks, got %d", expected, numPreAcks)
}
if seq > 0 && numSeqPreAcks != 0 {
return fmt.Errorf("expected 0 pre-acks for seq %d, got %d", seq, numSeqPreAcks)
}
}
return nil
})
}
// Check all pre-acks were registered.
checkPreAcks(0, 3)

// Deleting the message should clear the pre-ack.
err = js.DeleteMsg("TEST", 1)
require_NoError(t, err)
checkPreAcks(1, 2)

// Erasing the message should clear the pre-ack.
err = js.SecureDeleteMsg("TEST", 2)
require_NoError(t, err)
checkPreAcks(2, 1)

// Purging should clear all pre-acks below the purged floor.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 4})
require_NoError(t, err)
checkPreAcks(3, 0)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
77 changes: 46 additions & 31 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7738,32 +7738,47 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers(t *testing.T)
// make sure we do not remove prematurely.
msgs, err := sub.Fetch(100, nats.MaxWait(time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == 100)
require_Len(t, len(msgs), 100)
for _, m := range msgs {
m.AckSync()
}

ci, err := js.ConsumerInfo("EVENTS", "D")
require_NoError(t, err)
require_True(t, ci.NumPending == uint64(numToSend-100))
require_True(t, ci.NumAckPending == 0)
require_True(t, ci.Delivered.Stream == 100)
require_True(t, ci.AckFloor.Stream == 100)
require_Equal(t, ci.NumPending, uint64(numToSend-100))
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.Delivered.Stream, 100)
require_Equal(t, ci.AckFloor.Stream, 100)

// Check stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 900)
require_True(t, state.FirstSeq == 101)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
}
// Since acks result in messages to be removed through proposals,
// it could take some time to be reflected in the stream state.
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
if err != nil {
return err
}
state := mset.state()
if state.Msgs != 900 {
return fmt.Errorf("expected state.Msgs=900, got %d", state.Msgs)
}
if state.FirstSeq != 101 {
return fmt.Errorf("expected state.FirstSeq=101, got %d", state.FirstSeq)
}
if state.LastSeq != 1000 {
return fmt.Errorf("expected state.LastSeq=1000, got %d", state.LastSeq)
}
if state.Consumers != 2 {
return fmt.Errorf("expected state.Consumers=2, got %d", state.Consumers)
}
}
return nil
})

msgs, err = sub.Fetch(900, nats.MaxWait(time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == 900)
require_Len(t, len(msgs), 900)
for _, m := range msgs {
m.AckSync()
}
Expand All @@ -7776,15 +7791,15 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers(t *testing.T)
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 0)
require_True(t, state.FirstSeq == 1001)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
require_Equal(t, state.Msgs, 0)
require_Equal(t, state.FirstSeq, 1001)
require_Equal(t, state.LastSeq, 1000)
require_Equal(t, state.Consumers, 2)
// Now check preAcks
mset.mu.RLock()
numPreAcks := len(mset.preAcks)
mset.mu.RUnlock()
require_True(t, numPreAcks == 0)
require_Len(t, numPreAcks, 0)
}
}

Expand Down Expand Up @@ -7872,27 +7887,27 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleFilteredConsumers(t *te

ci, err := js.ConsumerInfo("EVENTS", "D")
require_NoError(t, err)
require_True(t, ci.NumPending == 0)
require_True(t, ci.NumAckPending == 0)
require_True(t, ci.Delivered.Consumer == 500)
require_True(t, ci.Delivered.Stream == 1000)
require_True(t, ci.AckFloor.Consumer == 500)
require_True(t, ci.AckFloor.Stream == 1000)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.Delivered.Consumer, 500)
require_Equal(t, ci.Delivered.Stream, 1000)
require_Equal(t, ci.AckFloor.Consumer, 500)
require_Equal(t, ci.AckFloor.Stream, 1000)

// Check final stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 0)
require_True(t, state.FirstSeq == 1001)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
require_Equal(t, state.Msgs, 0)
require_Equal(t, state.FirstSeq, 1001)
require_Equal(t, state.LastSeq, 1000)
require_Equal(t, state.Consumers, 2)
// Now check preAcks
mset.mu.RLock()
numPreAcks := len(mset.preAcks)
mset.mu.RUnlock()
require_True(t, numPreAcks == 0)
require_Len(t, numPreAcks, 0)
}
}

Expand Down
36 changes: 24 additions & 12 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,10 +1126,10 @@ func (mset *stream) lastSeq() uint64 {
return mset.lseq
}

// Set last seq.
// Write lock should be held.
func (mset *stream) setLastSeq(lseq uint64) {
mset.mu.Lock()
mset.lseq = lseq
mset.mu.Unlock()
}

func (mset *stream) sendCreateAdvisory() {
Expand Down Expand Up @@ -2188,11 +2188,16 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
store.FastState(&state)
fseq, lseq := state.FirstSeq, state.LastSeq

mset.mu.Lock()
// Check if our last has moved past what our original last sequence was, if so reset.
if lseq > mlseq {
mset.setLastSeq(lseq)
}

// Clear any pending acks below first seq.
mset.clearAllPreAcksBelowFloor(fseq)
mset.mu.Unlock()

// Purge consumers.
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
Expand Down Expand Up @@ -2239,15 +2244,29 @@ func (mset *stream) deleteMsg(seq uint64) (bool, error) {
if mset.closed.Load() {
return false, errStreamClosed
}
return mset.store.RemoveMsg(seq)
removed, err := mset.store.RemoveMsg(seq)
if err != nil {
return removed, err
}
mset.mu.Lock()
mset.clearAllPreAcks(seq)
mset.mu.Unlock()
return removed, err
}

// EraseMsg will securely remove a message and rewrite the data with random data.
func (mset *stream) eraseMsg(seq uint64) (bool, error) {
if mset.closed.Load() {
return false, errStreamClosed
}
return mset.store.EraseMsg(seq)
removed, err := mset.store.EraseMsg(seq)
if err != nil {
return removed, err
}
mset.mu.Lock()
mset.clearAllPreAcks(seq)
mset.mu.Unlock()
return removed, err
}

// Are we a mirror?
Expand Down Expand Up @@ -4138,15 +4157,8 @@ func (mset *stream) purgeMsgIds() {
}
}

// storeMsgId will store the message id for duplicate detection.
func (mset *stream) storeMsgId(dde *ddentry) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.storeMsgIdLocked(dde)
}

// storeMsgIdLocked will store the message id for duplicate detection.
// Lock should he held.
// Lock should be held.
func (mset *stream) storeMsgIdLocked(dde *ddentry) {
if mset.ddmap == nil {
mset.ddmap = make(map[string]*ddentry)
Expand Down
Loading