tests pass

This commit is contained in:
William Banfield
2022-02-21 16:16:25 -05:00
parent 85a6a908d1
commit 4b581d9e02
3 changed files with 59 additions and 33 deletions

View File

@@ -88,7 +88,7 @@ func (l *LiveIterator) Next(ctx context.Context) bool {
case t := <-l.timeoutTicker.Chan():
l.e = Event{Message: t, Time: time.Now()}
case <-l.txNotifier.TxsAvailable():
l.e = Event{txAvailable{}, time.Now()}
l.e = Event{Message: txAvailable{}, Time: time.Now()}
case <-ctx.Done():
l.err = ctx.Err()
return false
@@ -103,3 +103,39 @@ func (l *LiveIterator) Event() Event {
func (l *LiveIterator) Error() error {
return l.err
}
type WALWritingIterator struct {
wal WAL
Wrapped Iterator
err error
e Event
}
func (w *WALWritingIterator) Next(ctx context.Context) bool {
if !w.Wrapped.Next(ctx) {
return false
}
w.e = w.Wrapped.Event()
switch m := w.e.Message.(type) {
case msgInfo, timeoutInfo:
if err := w.wal.WriteSync(m); err != nil {
w.err = err
return false
}
default:
}
return true
}
func (w *WALWritingIterator) Event() Event {
return w.e
}
func (w *WALWritingIterator) Error() error {
if w.err != nil {
return w.err
}
return w.Wrapped.Error()
}

View File

@@ -144,28 +144,6 @@ func (cs *State) catchupReplay(ctx context.Context, csHeight int64) error {
dec := WALDecoder{gr}
cs.receiveRoutine(ctx, WALReadingIterator{dec: dec}, 0)
/*
LOOP:
for {
msg, err = dec.Decode()
switch {
case err == io.EOF:
break LOOP
case IsDataCorruptionError(err):
cs.logger.Error("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight)
return err
case err != nil:
return err
}
// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
if err := cs.readReplayMessage(ctx, msg, nil); err != nil {
return err
}
}
*/
cs.logger.Info("Replay: Done")
return nil
}

View File

@@ -442,14 +442,21 @@ func (cs *State) OnStart(ctx context.Context) error {
}
// now start the receiveRoutine
iter := LiveIterator{
var iter Iterator
iter = &LiveIterator{
peerMsgQueue: cs.peerMsgQueue,
internalMsgQueue: cs.internalMsgQueue,
timeoutTicker: cs.timeoutTicker,
txNotifier: cs.txNotifier,
}
go cs.receiveRoutine(ctx, &iter, 0)
iter = &WALWritingIterator{
wal: cs.wal,
Wrapped: iter,
}
go cs.receiveRoutine(ctx, iter, 0)
// schedule the first round!
// use GetRoundState so we don't race the receiveRoutine for access
@@ -469,7 +476,13 @@ func (cs *State) startRoutines(ctx context.Context, maxSteps int) {
return
}
go cs.receiveRoutine(ctx, testIterator{}, maxSteps)
iter := &LiveIterator{
peerMsgQueue: cs.peerMsgQueue,
internalMsgQueue: cs.internalMsgQueue,
timeoutTicker: cs.timeoutTicker,
txNotifier: cs.txNotifier,
}
go cs.receiveRoutine(ctx, iter, maxSteps)
}
// loadWalFile loads WAL data from file. It overwrites cs.wal.
@@ -866,7 +879,6 @@ func (cs *State) receiveRoutine(ctx context.Context, iter Iterator, maxSteps int
}()
for iter.Next(ctx) {
// multi iterator
e := iter.Event()
if maxSteps > 0 {
if cs.nSteps >= maxSteps {
@@ -875,12 +887,6 @@ func (cs *State) receiveRoutine(ctx context.Context, iter Iterator, maxSteps int
return
}
}
select {
case <-ctx.Done():
onExit(cs)
return
default:
}
switch msg := e.Message.(type) {
case txAvailable:
@@ -897,6 +903,12 @@ func (cs *State) receiveRoutine(ctx context.Context, iter Iterator, maxSteps int
if err := iter.Error(); err != nil {
cs.logger.Error("error iterating over consensus messages", "err", err)
}
select {
case <-ctx.Done():
onExit(cs)
return
default:
}
}
// state transitions on complete-proposal, 2/3-any, 2/3-one