From 6c39dcdba72a403dfada2e032dc2f13c0173a529 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Fri, 21 Jan 2022 15:27:55 -0500 Subject: [PATCH] move time calculation to earlier in the pipeline --- internal/consensus/byzantine_test.go | 5 +++-- internal/consensus/mempool_test.go | 4 ++-- internal/consensus/reactor.go | 7 +++--- internal/consensus/state.go | 33 ++++++++++++++-------------- internal/consensus/state_test.go | 17 +++++++------- types/params.go | 4 ++-- 6 files changed, 36 insertions(+), 34 deletions(-) diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 8a0840d39..217fca1af 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -23,6 +23,7 @@ import ( "github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/libs/log" + tmtime "github.com/tendermint/tendermint/libs/time" tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/types" @@ -222,12 +223,12 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { proposal.Signature = p.Signature // send proposal and block parts on internal msg queue - lazyNodeState.sendInternalMessage(ctx, msgInfo{&ProposalMessage{proposal}, ""}) + lazyNodeState.sendInternalMessage(ctx, msgInfo{&ProposalMessage{proposal}, "", tmtime.Now()}) for i := 0; i < int(blockParts.Total()); i++ { part := blockParts.GetPart(i) lazyNodeState.sendInternalMessage(ctx, msgInfo{&BlockPartMessage{ lazyNodeState.Height, lazyNodeState.Round, part, - }, ""}) + }, "", tmtime.Now()}) } } else if !lazyNodeState.replayMode { lazyNodeState.logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err) diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index 0ae0d2d9c..78f064600 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -101,13 +101,13 @@ func TestMempoolProgressInHigherRound(t *testing.T) { newBlockCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock) newRoundCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewRound) timeoutCh := subscribe(ctx, t, cs.eventBus, types.EventQueryTimeoutPropose) - cs.setProposal = func(proposal *types.Proposal) error { + cs.setProposal = func(proposal *types.Proposal, recvTime time.Time) error { if cs.Height == 2 && cs.Round == 0 { // dont set the proposal in round 0 so we timeout and // go to next round return nil } - return cs.defaultSetProposal(proposal) + return cs.defaultSetProposal(proposal, recvTime) } startTestRound(ctx, cs, height, round) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 127b175bc..7f4dc4426 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -16,6 +16,7 @@ import ( tmevents "github.com/tendermint/tendermint/libs/events" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" + tmtime "github.com/tendermint/tendermint/libs/time" tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/types" @@ -1184,7 +1185,7 @@ func (r *Reactor) handleDataMessage(ctx context.Context, envelope *p2p.Envelope, select { case <-ctx.Done(): return ctx.Err() - case r.state.peerMsgQueue <- msgInfo{pMsg, envelope.From}: + case r.state.peerMsgQueue <- msgInfo{pMsg, envelope.From, tmtime.Now()}: } case *tmcons.ProposalPOL: ps.ApplyProposalPOLMessage(msgI.(*ProposalPOLMessage)) @@ -1194,7 +1195,7 @@ func (r *Reactor) handleDataMessage(ctx context.Context, envelope *p2p.Envelope, ps.SetHasProposalBlockPart(bpMsg.Height, bpMsg.Round, int(bpMsg.Part.Index)) r.Metrics.BlockParts.With("peer_id", string(envelope.From)).Add(1) select { - case r.state.peerMsgQueue <- msgInfo{bpMsg, envelope.From}: + case r.state.peerMsgQueue <- msgInfo{bpMsg, envelope.From, tmtime.Now()}: return nil case <-ctx.Done(): return ctx.Err() @@ -1238,7 +1239,7 @@ func (r *Reactor) handleVoteMessage(ctx context.Context, envelope *p2p.Envelope, ps.SetHasVote(vMsg.Vote) select { - case r.state.peerMsgQueue <- msgInfo{vMsg, envelope.From}: + case r.state.peerMsgQueue <- msgInfo{vMsg, envelope.From, tmtime.Now()}: return nil case <-ctx.Done(): return ctx.Err() diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 512b1b789..29af5eada 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -46,8 +46,9 @@ var msgQueueSize = 1000 // msgs from the reactor which may update the state type msgInfo struct { - Msg Message `json:"msg"` - PeerID types.NodeID `json:"peer_key"` + Msg Message `json:"msg"` + PeerID types.NodeID `json:"peer_key"` + ReceiveTime time.Time `json:"receive_time"` } // internally generated messages which may update the state @@ -133,7 +134,7 @@ type State struct { // some functions can be overwritten for testing decideProposal func(ctx context.Context, height int64, round int32) doPrevote func(ctx context.Context, height int64, round int32) - setProposal func(proposal *types.Proposal) error + setProposal func(proposal *types.Proposal, t time.Time) error // closed when we finish shutting down done chan struct{} @@ -516,14 +517,14 @@ func (cs *State) AddVote(ctx context.Context, vote *types.Vote, peerID types.Nod select { case <-ctx.Done(): return ctx.Err() - case cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}: + case cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, "", tmtime.Now()}: return nil } } else { select { case <-ctx.Done(): return ctx.Err() - case cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID}: + case cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID, tmtime.Now()}: return nil } } @@ -538,14 +539,14 @@ func (cs *State) SetProposal(ctx context.Context, proposal *types.Proposal, peer select { case <-ctx.Done(): return ctx.Err() - case cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}: + case cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, "", tmtime.Now()}: return nil } } else { select { case <-ctx.Done(): return ctx.Err() - case cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID}: + case cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID, tmtime.Now()}: return nil } } @@ -559,14 +560,14 @@ func (cs *State) AddProposalBlockPart(ctx context.Context, height int64, round i select { case <-ctx.Done(): return ctx.Err() - case cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}: + case cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, "", tmtime.Now()}: return nil } } else { select { case <-ctx.Done(): return ctx.Err() - case cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID}: + case cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID, tmtime.Now()}: return nil } } @@ -904,7 +905,7 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { case *ProposalMessage: // will not cause transition. // once proposal is set, we can receive block parts - err = cs.setProposal(msg.Proposal) + err = cs.setProposal(msg.Proposal, mi.ReceiveTime) case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit @@ -1260,11 +1261,11 @@ func (cs *State) defaultDecideProposal(ctx context.Context, height int64, round proposal.Signature = p.Signature // send proposal and block parts on internal msg queue - cs.sendInternalMessage(ctx, msgInfo{&ProposalMessage{proposal}, ""}) + cs.sendInternalMessage(ctx, msgInfo{&ProposalMessage{proposal}, "", tmtime.Now()}) for i := 0; i < int(blockParts.Total()); i++ { part := blockParts.GetPart(i) - cs.sendInternalMessage(ctx, msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) + cs.sendInternalMessage(ctx, msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, "", tmtime.Now()}) } cs.logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal) @@ -1394,7 +1395,7 @@ func (cs *State) defaultDoPrevote(ctx context.Context, height int64, round int32 } if cs.Proposal.POLRound == -1 && cs.LockedRound == -1 && !cs.proposalIsTimely() { - logger.Debug("prevote step: Proposal is not timely; prevoting nil - ", + logger.Debug("prevote step: Proposal is not timely; prevoting nil", "proposed", tmtime.Canonical(cs.Proposal.Timestamp).Format(time.RFC3339Nano), "received", @@ -1956,9 +1957,7 @@ func (cs *State) RecordMetrics(height int64, block *types.Block) { //----------------------------------------------------------------------------- -func (cs *State) defaultSetProposal(proposal *types.Proposal) error { - recvTime := tmtime.Now() - +func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error { // Already have one // TODO: possibly catch double proposals if cs.Proposal != nil || proposal == nil { @@ -2384,7 +2383,7 @@ func (cs *State) signAddVote(ctx context.Context, msgType tmproto.SignedMsgType, // TODO: pass pubKey to signVote vote, err := cs.signVote(ctx, msgType, hash, header) if err == nil { - cs.sendInternalMessage(ctx, msgInfo{&VoteMessage{vote}, ""}) + cs.sendInternalMessage(ctx, msgInfo{&VoteMessage{vote}, "", tmtime.Now()}) cs.logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote) return vote } diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index dc308def0..0957f312d 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -17,6 +17,7 @@ import ( tmpubsub "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" + tmtime "github.com/tendermint/tendermint/libs/time" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/types" ) @@ -2408,26 +2409,26 @@ func TestStateOutputsBlockPartsStats(t *testing.T) { } cs.ProposalBlockParts = types.NewPartSetFromHeader(parts.Header()) - cs.handleMsg(ctx, msgInfo{msg, peerID}) + cs.handleMsg(ctx, msgInfo{msg, peerID, tmtime.Now()}) statsMessage := <-cs.statsMsgQueue require.Equal(t, msg, statsMessage.Msg, "") require.Equal(t, peerID, statsMessage.PeerID, "") // sending the same part from different peer - cs.handleMsg(ctx, msgInfo{msg, "peer2"}) + cs.handleMsg(ctx, msgInfo{msg, "peer2", tmtime.Now()}) // sending the part with the same height, but different round msg.Round = 1 - cs.handleMsg(ctx, msgInfo{msg, peerID}) + cs.handleMsg(ctx, msgInfo{msg, peerID, tmtime.Now()}) // sending the part from the smaller height msg.Height = 0 - cs.handleMsg(ctx, msgInfo{msg, peerID}) + cs.handleMsg(ctx, msgInfo{msg, peerID, tmtime.Now()}) // sending the part from the bigger height msg.Height = 3 - cs.handleMsg(ctx, msgInfo{msg, peerID}) + cs.handleMsg(ctx, msgInfo{msg, peerID, tmtime.Now()}) select { case <-cs.statsMsgQueue: @@ -2456,20 +2457,20 @@ func TestStateOutputVoteStats(t *testing.T) { vote := signVote(ctx, t, vss[1], tmproto.PrecommitType, config.ChainID(), blockID) voteMessage := &VoteMessage{vote} - cs.handleMsg(ctx, msgInfo{voteMessage, peerID}) + cs.handleMsg(ctx, msgInfo{voteMessage, peerID, tmtime.Now()}) statsMessage := <-cs.statsMsgQueue require.Equal(t, voteMessage, statsMessage.Msg, "") require.Equal(t, peerID, statsMessage.PeerID, "") // sending the same part from different peer - cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, "peer2"}) + cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, "peer2", tmtime.Now()}) // sending the vote for the bigger height incrementHeight(vss[1]) vote = signVote(ctx, t, vss[1], tmproto.PrecommitType, config.ChainID(), blockID) - cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, peerID}) + cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, peerID, tmtime.Now()}) select { case <-cs.statsMsgQueue: diff --git a/types/params.go b/types/params.go index b27838eeb..df495b94d 100644 --- a/types/params.go +++ b/types/params.go @@ -129,8 +129,8 @@ func DefaultSynchronyParams() SynchronyParams { // TODO(@wbanfield): Determine experimental values for these defaults // https://github.com/tendermint/tendermint/issues/7202 return SynchronyParams{ - Precision: 10 * time.Minute, - MessageDelay: 500 * time.Minute, + Precision: 10 * time.Millisecond, + MessageDelay: 500 * time.Millisecond, } }