diff --git a/internal/consensus/iterator.go b/internal/consensus/iterator.go index 97b962283..9472031a2 100644 --- a/internal/consensus/iterator.go +++ b/internal/consensus/iterator.go @@ -88,7 +88,7 @@ func (l *LiveIterator) Next(ctx context.Context) bool { case t := <-l.timeoutTicker.Chan(): l.e = Event{Message: t, Time: time.Now()} case <-l.txNotifier.TxsAvailable(): - l.e = Event{txAvailable{}, time.Now()} + l.e = Event{Message: txAvailable{}, Time: time.Now()} case <-ctx.Done(): l.err = ctx.Err() return false @@ -103,3 +103,39 @@ func (l *LiveIterator) Event() Event { func (l *LiveIterator) Error() error { return l.err } + +type WALWritingIterator struct { + wal WAL + Wrapped Iterator + + err error + e Event +} + +func (w *WALWritingIterator) Next(ctx context.Context) bool { + if !w.Wrapped.Next(ctx) { + return false + } + w.e = w.Wrapped.Event() + + switch m := w.e.Message.(type) { + case msgInfo, timeoutInfo: + if err := w.wal.WriteSync(m); err != nil { + w.err = err + return false + } + default: + } + return true +} + +func (w *WALWritingIterator) Event() Event { + return w.e +} + +func (w *WALWritingIterator) Error() error { + if w.err != nil { + return w.err + } + return w.Wrapped.Error() +} diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index dccfca325..ba1e904c8 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -144,28 +144,6 @@ func (cs *State) catchupReplay(ctx context.Context, csHeight int64) error { dec := WALDecoder{gr} cs.receiveRoutine(ctx, WALReadingIterator{dec: dec}, 0) - /* - LOOP: - for { - msg, err = dec.Decode() - switch { - case err == io.EOF: - break LOOP - case IsDataCorruptionError(err): - cs.logger.Error("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight) - return err - case err != nil: - return err - } - - // NOTE: since the priv key is set when the msgs are received - // it will attempt to eg double sign but we can just ignore it - // since the votes will be replayed and we'll get to the next step - if err := cs.readReplayMessage(ctx, msg, nil); err != nil { - return err - } - } - */ cs.logger.Info("Replay: Done") return nil } diff --git a/internal/consensus/state.go b/internal/consensus/state.go index fad9648a6..6b2d01c09 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -442,14 +442,21 @@ func (cs *State) OnStart(ctx context.Context) error { } // now start the receiveRoutine - iter := LiveIterator{ + var iter Iterator + + iter = &LiveIterator{ peerMsgQueue: cs.peerMsgQueue, internalMsgQueue: cs.internalMsgQueue, timeoutTicker: cs.timeoutTicker, txNotifier: cs.txNotifier, } - go cs.receiveRoutine(ctx, &iter, 0) + iter = &WALWritingIterator{ + wal: cs.wal, + Wrapped: iter, + } + + go cs.receiveRoutine(ctx, iter, 0) // schedule the first round! // use GetRoundState so we don't race the receiveRoutine for access @@ -469,7 +476,13 @@ func (cs *State) startRoutines(ctx context.Context, maxSteps int) { return } - go cs.receiveRoutine(ctx, testIterator{}, maxSteps) + iter := &LiveIterator{ + peerMsgQueue: cs.peerMsgQueue, + internalMsgQueue: cs.internalMsgQueue, + timeoutTicker: cs.timeoutTicker, + txNotifier: cs.txNotifier, + } + go cs.receiveRoutine(ctx, iter, maxSteps) } // loadWalFile loads WAL data from file. It overwrites cs.wal. @@ -866,7 +879,6 @@ func (cs *State) receiveRoutine(ctx context.Context, iter Iterator, maxSteps int }() for iter.Next(ctx) { - // multi iterator e := iter.Event() if maxSteps > 0 { if cs.nSteps >= maxSteps { @@ -875,12 +887,6 @@ func (cs *State) receiveRoutine(ctx context.Context, iter Iterator, maxSteps int return } } - select { - case <-ctx.Done(): - onExit(cs) - return - default: - } switch msg := e.Message.(type) { case txAvailable: @@ -897,6 +903,12 @@ func (cs *State) receiveRoutine(ctx context.Context, iter Iterator, maxSteps int if err := iter.Error(); err != nil { cs.logger.Error("error iterating over consensus messages", "err", err) } + select { + case <-ctx.Done(): + onExit(cs) + return + default: + } } // state transitions on complete-proposal, 2/3-any, 2/3-one