-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.go
276 lines (231 loc) · 7.59 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package sample
import (
"compress/gzip"
"context"
"crypto"
"encoding/binary"
"fmt"
"os"
"time"
"github.com/hyperledger-labs/mirbft"
"github.com/hyperledger-labs/mirbft/pkg/eventlog"
pb "github.com/hyperledger-labs/mirbft/pkg/pb/msgs"
"github.com/hyperledger-labs/mirbft/pkg/reqstore"
"github.com/hyperledger-labs/mirbft/pkg/simplewal"
"github.com/jyellick/mirbft-sample/config"
"github.com/jyellick/mirbft-sample/network"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
type Server struct {
Logger *zap.SugaredLogger
NodeConfig *config.NodeConfig
WALPath string
RequestStorePath string
EventLogPath string
Serial bool
doneC chan struct{}
exitC chan struct{}
}
type MirLogAdapter zap.SugaredLogger
func (m *MirLogAdapter) Log(level mirbft.LogLevel, msg string, pairs ...interface{}) {
z := (*zap.SugaredLogger)(m)
switch level {
case mirbft.LevelDebug:
z.Debugw(msg, pairs...)
case mirbft.LevelInfo:
z.Infow(msg, pairs...)
case mirbft.LevelWarn:
z.Warnw(msg, pairs...)
case mirbft.LevelError:
z.Errorw(msg, pairs...)
}
}
func (s *Server) Run() error {
s.doneC = make(chan struct{})
s.exitC = make(chan struct{})
defer close(s.exitC)
mirConfig := mirConfig(s.NodeConfig)
mirConfig.Logger = (*MirLogAdapter)(s.Logger)
var recorder *eventlog.Recorder
if s.EventLogPath != "" {
file, err := os.Create(s.EventLogPath)
if err != nil {
return errors.WithMessage(err, "could not create event log file")
}
recorder = eventlog.NewRecorder(
s.NodeConfig.ID,
file,
eventlog.CompressionLevelOpt(gzip.NoCompression),
)
defer recorder.Stop()
}
wal, err := simplewal.Open(s.WALPath)
if err != nil {
return errors.WithMessage(err, "could not open WAL")
}
defer wal.Close()
firstStart, err := wal.IsEmpty()
if err != nil {
return errors.WithMessage(err, "could not query WAL")
}
reqStore, err := reqstore.Open(s.RequestStorePath)
if err != nil {
return errors.WithMessage(err, "could not open request store")
}
defer reqStore.Close()
// Create transport
t, err := network.NewServerTransport(s.Logger, s.NodeConfig)
if err != nil {
return errors.WithMessage(err, "could not create networking")
}
node, err := mirbft.NewNode(
s.NodeConfig.ID,
mirConfig,
&mirbft.ProcessorConfig{
Link: t,
Hasher: crypto.SHA256,
App: &application{
reqStore: reqStore,
}, // TODO, make more useful fixme
RequestStore: reqStore,
WAL: wal,
Interceptor: recorder,
},
)
if err != nil {
return errors.WithMessage(err, "could not create mirbft node")
}
t.Handle(
func(nodeID uint64, data []byte) ([]byte, error) {
msg := &pb.Msg{}
err := proto.Unmarshal(data, msg)
if err != nil {
return nil, errors.WithMessage(err, "unexpected unmarshaling error")
}
err = node.Step(context.Background(), nodeID, msg)
if err != nil {
return nil, errors.WithMessage(err, "failed to step message to mir node")
}
return nil, nil
},
func(clientID uint64, data []byte) ([]byte, error) {
// This simple protocol replies with the next reqno if no request
// data is supplied.
if len(data) == 0 {
proposer := node.Client(clientID)
nextReqNo, err := proposer.NextReqNo()
if err != nil {
return nil, errors.WithMessage(err, "could not get next request number")
}
encodedNextReqNo := make([]byte, 8)
binary.BigEndian.PutUint64(encodedNextReqNo, nextReqNo)
return encodedNextReqNo, nil
}
msg := &pb.Request{}
err := proto.Unmarshal(data, msg)
if err != nil {
return nil, errors.WithMessage(err, "unexpected unmarshaling error")
}
if msg.ClientId != clientID {
return nil, errors.Errorf("client ID mismatch, claims to be %d but is %d\n", msg.ClientId, clientID)
}
proposer := node.Client(clientID)
if err != nil {
return nil, errors.Errorf("unknown client id\n", clientID)
}
err = proposer.Propose(context.Background(), msg.ReqNo, msg.Data)
if err != nil {
return nil, errors.WithMessagef(err, "failed to propose message to client %d", clientID)
}
return nil, nil
},
)
err = t.Start()
if err != nil {
return errors.WithMessage(err, "could not start networking")
}
defer t.Close()
// TODO, maybe detect if this is first start and actually
// detect other node liveness via the transport?
// let the links establish first to reduce logspam...
time.Sleep(2 * time.Second)
ticker := time.NewTicker(s.NodeConfig.MirRuntime.TickInterval)
defer ticker.Stop()
// Main control loop
if firstStart {
return node.ProcessAsNewNode(s.doneC, ticker.C, initialNetworkState(s.NodeConfig), []byte("initial-checkpoint-value"))
}
return node.RestartProcessing(s.doneC, ticker.C)
}
func (s *Server) Stop() {
close(s.doneC)
<-s.exitC
}
type application struct {
count uint64
reqStore *reqstore.Store
}
func (app *application) Apply(entry *pb.QEntry) error {
fmt.Printf("Committing an entry for seq_no=%d (current count=%d)\n", entry.SeqNo, app.count)
for _, request := range entry.Requests {
reqData, err := app.reqStore.GetRequest(request)
if err != nil {
return errors.WithMessage(err, "could get entry from request store")
}
fmt.Printf(" Applying clientID=%d reqNo=%d with data of length %d start %q to log\n", request.ClientId, request.ReqNo, len(reqData), string(reqData[:26]))
app.count++
}
return nil
}
func (app *application) Snap(networkConfig *pb.NetworkState_Config, clients []*pb.NetworkState_Client) ([]byte, []*pb.Reconfiguration, error) {
// XXX, we put the entire configuration into the snapshot value, we should
// really hash this, and have some protocol level state transfer, but this is easy for now
// and relatively small. Also note, proto isn't deterministic, but for right now, good enough.
data, err := proto.Marshal(&pb.NetworkState{
Config: networkConfig,
Clients: clients,
})
if err != nil {
return nil, nil, errors.WithMessage(err, "could not marsshal network state")
}
countValue := make([]byte, 8)
binary.BigEndian.PutUint64(countValue, uint64(app.count))
return append(countValue, data...), nil, nil
}
func (app *application) TransferTo(seq uint64, value []byte) (*pb.NetworkState, error) {
countValue := value[:8]
app.count = binary.BigEndian.Uint64(countValue)
stateValue := value[8:]
ns := &pb.NetworkState{}
err := proto.Unmarshal(stateValue, ns)
if err != nil {
return nil, errors.WithMessage(err, "could not unmarshal checkpoint value to network state")
}
fmt.Printf("Completed state transfer to sequence %d with a total count of %d requests applied\n", seq, app.count)
return ns, nil
}
func mirConfig(nodeConfig *config.NodeConfig) *mirbft.Config {
return &mirbft.Config{
BatchSize: nodeConfig.MirRuntime.BatchSize,
HeartbeatTicks: nodeConfig.MirRuntime.HeartbeatTicks,
SuspectTicks: nodeConfig.MirRuntime.SuspectTicks,
NewEpochTimeoutTicks: nodeConfig.MirRuntime.NewEpochTimeoutTicks,
BufferSize: nodeConfig.MirRuntime.BufferSize,
}
}
func initialNetworkState(nodeConfig *config.NodeConfig) *pb.NetworkState {
// The sample application relies on the configuration assigning client IDs contiguously starting from 0.
networkState := mirbft.StandardInitialNetworkState(len(nodeConfig.Nodes), len(nodeConfig.Clients))
networkState.Config.NumberOfBuckets = int32(nodeConfig.MirBootstrap.NumberOfBuckets)
networkState.Config.CheckpointInterval = int32(nodeConfig.MirBootstrap.CheckpointInterval)
for _, client := range networkState.Clients {
client.Width = nodeConfig.MirBootstrap.ClientWindowSize
}
return networkState
}