From 909d0fe1506246d2f6305900d031fc205b2d7618 Mon Sep 17 00:00:00 2001 From: zghh <1069308575@qq.com> Date: Thu, 11 Aug 2022 11:10:01 +0800 Subject: [PATCH 1/2] Fix inconsistent state between WAL and saved Snapshot Signed-off-by: zghh <1069308575@qq.com> --- orderer/consensus/etcdraft/storage.go | 97 ++++++++++++++-------- orderer/consensus/etcdraft/storage_test.go | 10 +-- 2 files changed, 66 insertions(+), 41 deletions(-) diff --git a/orderer/consensus/etcdraft/storage.go b/orderer/consensus/etcdraft/storage.go index 29fa2f09dca..00923d438dc 100644 --- a/orderer/consensus/etcdraft/storage.go +++ b/orderer/consensus/etcdraft/storage.go @@ -75,22 +75,9 @@ func CreateStorage( return nil, err } - snapshot, err := sn.Load() + snapshot, w, st, ents, err := loadNewestAvailableSnapshot(lg, walDir, snapDir) if err != nil { - if err == snap.ErrNoSnapshot { - lg.Debugf("No snapshot found at %s", snapDir) - } else { - return nil, errors.Errorf("failed to load snapshot: %s", err) - } - } else { - // snapshot found - lg.Debugf("Loaded snapshot at Term %d and Index %d, Nodes: %+v", - snapshot.Metadata.Term, snapshot.Metadata.Index, snapshot.Metadata.ConfState.Nodes) - } - - w, st, ents, err := createOrReadWAL(lg, walDir, snapshot) - if err != nil { - return nil, errors.Errorf("failed to create or read WAL: %s", err) + return nil, errors.Errorf("Failed to load snapshot and WAL: %s", err) } if snapshot != nil { @@ -120,26 +107,11 @@ func CreateStorage( // ListSnapshots returns a list of RaftIndex of snapshots stored on disk. // If a file is corrupted, rename the file. func ListSnapshots(logger *flogging.FabricLogger, snapDir string) []uint64 { - dir, err := os.Open(snapDir) + snapfiles, err := listSnapshotFiles(logger, snapDir) if err != nil { - logger.Errorf("Failed to open snapshot directory %s: %s", snapDir, err) + logger.Errorf("Failed to list snapshot files from %s: %s", snapDir, err) return nil } - defer dir.Close() - - filenames, err := dir.Readdirnames(-1) - if err != nil { - logger.Errorf("Failed to read snapshot files: %s", err) - return nil - } - - snapfiles := []string{} - for i := range filenames { - if strings.HasSuffix(filenames[i], ".snap") { - snapfiles = append(snapfiles, filenames[i]) - } - } - sort.Strings(snapfiles) var snapshots []uint64 for _, snapfile := range snapfiles { @@ -242,15 +214,17 @@ func (rs *RaftStorage) Snapshot() raftpb.Snapshot { // Store persists etcd/raft data func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error { - if err := rs.wal.Save(hardstate, entries); err != nil { - return err - } - if !raft.IsEmptySnap(snapshot) { if err := rs.saveSnap(snapshot); err != nil { return err } + } + + if err := rs.wal.Save(hardstate, entries); err != nil { + return err + } + if !raft.IsEmptySnap(snapshot) { if err := rs.ram.ApplySnapshot(snapshot); err != nil { if err == raft.ErrSnapOutOfDate { rs.lg.Warnf("Attempted to apply out-of-date snapshot at Term %d and Index %d", @@ -447,3 +421,54 @@ func (rs *RaftStorage) Close() error { return nil } + +func loadNewestAvailableSnapshot(lg *flogging.FabricLogger, walDir, snapDir string) (*raftpb.Snapshot, *wal.WAL, raftpb.HardState, []raftpb.Entry, error) { + snapfiles, err := listSnapshotFiles(lg, snapDir) + if err != nil { + lg.Errorf("Failed to list snapshot files from %s: %s", snapDir, err) + } + for i := len(snapfiles) - 1; i >= 0; i-- { + snapshot, err := snap.Read(lg.Zap(), filepath.Join(snapDir, snapfiles[i])) + if err != nil { + lg.Warnf("Can not read snapshot from %s: %s", snapfiles[i], err) + continue + } + w, st, ents, err := createOrReadWAL(lg, walDir, snapshot) + if err != nil { + lg.Warnf("Create or read wal error: %s", err) + continue + } + if snapshot.Metadata.Index <= st.Commit { + return snapshot, w, st, ents, nil + } + if err := w.Close(); err != nil { + return nil, nil, raftpb.HardState{}, nil, err + } + } + lg.Warnf("Not available snapshot found in %s", snapDir) + w, st, ents, err := createOrReadWAL(lg, walDir, nil) + return nil, w, st, ents, err +} + +func listSnapshotFiles(logging *flogging.FabricLogger, snapDir string) ([]string, error) { + dir, err := os.Open(snapDir) + if err != nil { + return nil, err + } + defer dir.Close() + + filenames, err := dir.Readdirnames(-1) + if err != nil { + return nil, err + } + + snapfiles := []string{} + for i := range filenames { + if strings.HasSuffix(filenames[i], ".snap") { + snapfiles = append(snapfiles, filenames[i]) + } + } + sort.Strings(snapfiles) + + return snapfiles, nil +} diff --git a/orderer/consensus/etcdraft/storage_test.go b/orderer/consensus/etcdraft/storage_test.go index cd7233f4a46..3668714594c 100644 --- a/orderer/consensus/etcdraft/storage_test.go +++ b/orderer/consensus/etcdraft/storage_test.go @@ -80,7 +80,7 @@ func TestOpenWAL(t *testing.T) { for i := 0; i < 10; i++ { store.Store( []raftpb.Entry{{Index: uint64(i), Data: make([]byte, 10)}}, - raftpb.HardState{}, + raftpb.HardState{Commit: uint64(i)}, raftpb.Snapshot{}, ) } @@ -155,7 +155,7 @@ func TestTakeSnapshot(t *testing.T) { for i := 0; i < 10; i++ { store.Store( []raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}}, - raftpb.HardState{}, + raftpb.HardState{Commit: uint64(i)}, raftpb.Snapshot{}, ) } @@ -216,7 +216,7 @@ func TestTakeSnapshot(t *testing.T) { for i := 0; i < 10; i++ { store.Store( []raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}}, - raftpb.HardState{}, + raftpb.HardState{Commit: uint64(i)}, raftpb.Snapshot{}, ) } @@ -282,7 +282,7 @@ func TestTakeSnapshot(t *testing.T) { for i := 0; i < 10; i++ { store.Store( []raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}}, - raftpb.HardState{}, + raftpb.HardState{Commit: uint64(i)}, raftpb.Snapshot{}, ) } @@ -369,7 +369,7 @@ func TestApplyOutOfDateSnapshot(t *testing.T) { for i := 0; i < 10; i++ { store.Store( []raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}}, - raftpb.HardState{}, + raftpb.HardState{Commit: uint64(i)}, raftpb.Snapshot{}, ) } From 82a897ed739a5fdd7ce0921ee69114a26290763a Mon Sep 17 00:00:00 2001 From: zghh <1069308575@qq.com> Date: Thu, 11 Aug 2022 20:03:06 +0800 Subject: [PATCH 2/2] Fix inconsistent state between WAL and saved Snapshot, and add the unit test to reproduce the problem. Signed-off-by: zghh <1069308575@qq.com> --- orderer/consensus/etcdraft/storage_test.go | 79 ++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/orderer/consensus/etcdraft/storage_test.go b/orderer/consensus/etcdraft/storage_test.go index 3668714594c..89a0b26d89b 100644 --- a/orderer/consensus/etcdraft/storage_test.go +++ b/orderer/consensus/etcdraft/storage_test.go @@ -395,3 +395,82 @@ func TestApplyOutOfDateSnapshot(t *testing.T) { assertFileCount(t, 12, 1) }) } + +func TestAbortWhenWritingSnapshot(t *testing.T) { + t.Run("Abort when writing snapshot", func(t *testing.T) { + setup(t) + defer clean(t) + + // set SegmentSizeBytes to a small value so that + // every entry persisted to wal would result in + // a new wal being created. + oldSegmentSizeBytes := wal.SegmentSizeBytes + wal.SegmentSizeBytes = 10 + defer func() { + wal.SegmentSizeBytes = oldSegmentSizeBytes + }() + + // create 5 new entry + for i := 0; i < 5; i++ { + store.Store( + []raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}}, + raftpb.HardState{Commit: uint64(i)}, + raftpb.Snapshot{}, + ) + } + assertFileCount(t, 6, 0) + + // Assume an orderer missed some records due to exceptions and receives a new snapshot from other orderers. + commit := 10 + store.Store( + []raftpb.Entry{}, + raftpb.HardState{Commit: uint64(commit)}, + raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{ + Index: uint64(commit), + }, + Data: make([]byte, 100), + }, + ) + err = store.Close() + assert.NoError(t, err) + + // In old logic, it will use rs.wal.Save(hardstate, entries) to save the state firstly, so we remove the snapshot files. + // sd, err := os.Open(snapDir) + // assert.NoError(t, err) + // defer sd.Close() + // names, err := sd.Readdirnames(-1) + // assert.NoError(t, err) + // sort.Sort(sort.Reverse(sort.StringSlice(names))) + // os.Remove(filepath.Join(snapDir, names[0])) + // wd, err := os.Open(walDir) + // assert.NoError(t, err) + // defer wd.Close() + // names, err = wd.Readdirnames(-1) + // assert.NoError(t, err) + // sort.Sort(sort.Reverse(sort.StringSlice(names))) + // os.Remove(filepath.Join(walDir, names[0])) + + // But in the new logic, it will use rs.saveSnap(snapshot) to save the snapshot firstly, so we remove the WAL files. + wd, err := os.Open(walDir) + assert.NoError(t, err) + defer wd.Close() + names, err := wd.Readdirnames(-1) + assert.NoError(t, err) + sort.Sort(sort.Reverse(sort.StringSlice(names))) + os.Remove(filepath.Join(walDir, names[0])) + + // Then restart the orderer. + ram := raft.NewMemoryStorage() + store, err = CreateStorage(logger, walDir, snapDir, ram) + assert.NoError(t, err) + + // Check the state from go.etcd.io/etcd/raft/raft.go + // func (r *raft) loadState(state pb.HardState) + hd, _, err := store.ram.InitialState() + assert.NoError(t, err) + lastIndex, err := store.ram.LastIndex() + assert.NoError(t, err) + assert.False(t, hd.Commit > lastIndex) + }) +}