move time calculation to earlier in the pipeline

This commit is contained in:
William Banfield
2022-01-21 15:27:55 -05:00
parent 15ecd7ea66
commit 6c39dcdba7
6 changed files with 36 additions and 34 deletions

View File

@@ -23,6 +23,7 @@ import (
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmtime "github.com/tendermint/tendermint/libs/time"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
@@ -222,12 +223,12 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
proposal.Signature = p.Signature
// send proposal and block parts on internal msg queue
lazyNodeState.sendInternalMessage(ctx, msgInfo{&ProposalMessage{proposal}, ""})
lazyNodeState.sendInternalMessage(ctx, msgInfo{&ProposalMessage{proposal}, "", tmtime.Now()})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
lazyNodeState.sendInternalMessage(ctx, msgInfo{&BlockPartMessage{
lazyNodeState.Height, lazyNodeState.Round, part,
}, ""})
}, "", tmtime.Now()})
}
} else if !lazyNodeState.replayMode {
lazyNodeState.logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err)

View File

@@ -101,13 +101,13 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
newBlockCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock)
newRoundCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewRound)
timeoutCh := subscribe(ctx, t, cs.eventBus, types.EventQueryTimeoutPropose)
cs.setProposal = func(proposal *types.Proposal) error {
cs.setProposal = func(proposal *types.Proposal, recvTime time.Time) error {
if cs.Height == 2 && cs.Round == 0 {
// dont set the proposal in round 0 so we timeout and
// go to next round
return nil
}
return cs.defaultSetProposal(proposal)
return cs.defaultSetProposal(proposal, recvTime)
}
startTestRound(ctx, cs, height, round)

View File

@@ -16,6 +16,7 @@ import (
tmevents "github.com/tendermint/tendermint/libs/events"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmtime "github.com/tendermint/tendermint/libs/time"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
@@ -1184,7 +1185,7 @@ func (r *Reactor) handleDataMessage(ctx context.Context, envelope *p2p.Envelope,
select {
case <-ctx.Done():
return ctx.Err()
case r.state.peerMsgQueue <- msgInfo{pMsg, envelope.From}:
case r.state.peerMsgQueue <- msgInfo{pMsg, envelope.From, tmtime.Now()}:
}
case *tmcons.ProposalPOL:
ps.ApplyProposalPOLMessage(msgI.(*ProposalPOLMessage))
@@ -1194,7 +1195,7 @@ func (r *Reactor) handleDataMessage(ctx context.Context, envelope *p2p.Envelope,
ps.SetHasProposalBlockPart(bpMsg.Height, bpMsg.Round, int(bpMsg.Part.Index))
r.Metrics.BlockParts.With("peer_id", string(envelope.From)).Add(1)
select {
case r.state.peerMsgQueue <- msgInfo{bpMsg, envelope.From}:
case r.state.peerMsgQueue <- msgInfo{bpMsg, envelope.From, tmtime.Now()}:
return nil
case <-ctx.Done():
return ctx.Err()
@@ -1238,7 +1239,7 @@ func (r *Reactor) handleVoteMessage(ctx context.Context, envelope *p2p.Envelope,
ps.SetHasVote(vMsg.Vote)
select {
case r.state.peerMsgQueue <- msgInfo{vMsg, envelope.From}:
case r.state.peerMsgQueue <- msgInfo{vMsg, envelope.From, tmtime.Now()}:
return nil
case <-ctx.Done():
return ctx.Err()

View File

@@ -46,8 +46,9 @@ var msgQueueSize = 1000
// msgs from the reactor which may update the state
type msgInfo struct {
Msg Message `json:"msg"`
PeerID types.NodeID `json:"peer_key"`
Msg Message `json:"msg"`
PeerID types.NodeID `json:"peer_key"`
ReceiveTime time.Time `json:"receive_time"`
}
// internally generated messages which may update the state
@@ -133,7 +134,7 @@ type State struct {
// some functions can be overwritten for testing
decideProposal func(ctx context.Context, height int64, round int32)
doPrevote func(ctx context.Context, height int64, round int32)
setProposal func(proposal *types.Proposal) error
setProposal func(proposal *types.Proposal, t time.Time) error
// closed when we finish shutting down
done chan struct{}
@@ -516,14 +517,14 @@ func (cs *State) AddVote(ctx context.Context, vote *types.Vote, peerID types.Nod
select {
case <-ctx.Done():
return ctx.Err()
case cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}:
case cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, "", tmtime.Now()}:
return nil
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID}:
case cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID, tmtime.Now()}:
return nil
}
}
@@ -538,14 +539,14 @@ func (cs *State) SetProposal(ctx context.Context, proposal *types.Proposal, peer
select {
case <-ctx.Done():
return ctx.Err()
case cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}:
case cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, "", tmtime.Now()}:
return nil
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID}:
case cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID, tmtime.Now()}:
return nil
}
}
@@ -559,14 +560,14 @@ func (cs *State) AddProposalBlockPart(ctx context.Context, height int64, round i
select {
case <-ctx.Done():
return ctx.Err()
case cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}:
case cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, "", tmtime.Now()}:
return nil
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID}:
case cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID, tmtime.Now()}:
return nil
}
}
@@ -904,7 +905,7 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) {
case *ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
err = cs.setProposal(msg.Proposal)
err = cs.setProposal(msg.Proposal, mi.ReceiveTime)
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
@@ -1260,11 +1261,11 @@ func (cs *State) defaultDecideProposal(ctx context.Context, height int64, round
proposal.Signature = p.Signature
// send proposal and block parts on internal msg queue
cs.sendInternalMessage(ctx, msgInfo{&ProposalMessage{proposal}, ""})
cs.sendInternalMessage(ctx, msgInfo{&ProposalMessage{proposal}, "", tmtime.Now()})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
cs.sendInternalMessage(ctx, msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
cs.sendInternalMessage(ctx, msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, "", tmtime.Now()})
}
cs.logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal)
@@ -1394,7 +1395,7 @@ func (cs *State) defaultDoPrevote(ctx context.Context, height int64, round int32
}
if cs.Proposal.POLRound == -1 && cs.LockedRound == -1 && !cs.proposalIsTimely() {
logger.Debug("prevote step: Proposal is not timely; prevoting nil - ",
logger.Debug("prevote step: Proposal is not timely; prevoting nil",
"proposed",
tmtime.Canonical(cs.Proposal.Timestamp).Format(time.RFC3339Nano),
"received",
@@ -1956,9 +1957,7 @@ func (cs *State) RecordMetrics(height int64, block *types.Block) {
//-----------------------------------------------------------------------------
func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
recvTime := tmtime.Now()
func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error {
// Already have one
// TODO: possibly catch double proposals
if cs.Proposal != nil || proposal == nil {
@@ -2384,7 +2383,7 @@ func (cs *State) signAddVote(ctx context.Context, msgType tmproto.SignedMsgType,
// TODO: pass pubKey to signVote
vote, err := cs.signVote(ctx, msgType, hash, header)
if err == nil {
cs.sendInternalMessage(ctx, msgInfo{&VoteMessage{vote}, ""})
cs.sendInternalMessage(ctx, msgInfo{&VoteMessage{vote}, "", tmtime.Now()})
cs.logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
return vote
}

View File

@@ -17,6 +17,7 @@ import (
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
tmtime "github.com/tendermint/tendermint/libs/time"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
@@ -2408,26 +2409,26 @@ func TestStateOutputsBlockPartsStats(t *testing.T) {
}
cs.ProposalBlockParts = types.NewPartSetFromHeader(parts.Header())
cs.handleMsg(ctx, msgInfo{msg, peerID})
cs.handleMsg(ctx, msgInfo{msg, peerID, tmtime.Now()})
statsMessage := <-cs.statsMsgQueue
require.Equal(t, msg, statsMessage.Msg, "")
require.Equal(t, peerID, statsMessage.PeerID, "")
// sending the same part from different peer
cs.handleMsg(ctx, msgInfo{msg, "peer2"})
cs.handleMsg(ctx, msgInfo{msg, "peer2", tmtime.Now()})
// sending the part with the same height, but different round
msg.Round = 1
cs.handleMsg(ctx, msgInfo{msg, peerID})
cs.handleMsg(ctx, msgInfo{msg, peerID, tmtime.Now()})
// sending the part from the smaller height
msg.Height = 0
cs.handleMsg(ctx, msgInfo{msg, peerID})
cs.handleMsg(ctx, msgInfo{msg, peerID, tmtime.Now()})
// sending the part from the bigger height
msg.Height = 3
cs.handleMsg(ctx, msgInfo{msg, peerID})
cs.handleMsg(ctx, msgInfo{msg, peerID, tmtime.Now()})
select {
case <-cs.statsMsgQueue:
@@ -2456,20 +2457,20 @@ func TestStateOutputVoteStats(t *testing.T) {
vote := signVote(ctx, t, vss[1], tmproto.PrecommitType, config.ChainID(), blockID)
voteMessage := &VoteMessage{vote}
cs.handleMsg(ctx, msgInfo{voteMessage, peerID})
cs.handleMsg(ctx, msgInfo{voteMessage, peerID, tmtime.Now()})
statsMessage := <-cs.statsMsgQueue
require.Equal(t, voteMessage, statsMessage.Msg, "")
require.Equal(t, peerID, statsMessage.PeerID, "")
// sending the same part from different peer
cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, "peer2"})
cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, "peer2", tmtime.Now()})
// sending the vote for the bigger height
incrementHeight(vss[1])
vote = signVote(ctx, t, vss[1], tmproto.PrecommitType, config.ChainID(), blockID)
cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, peerID})
cs.handleMsg(ctx, msgInfo{&VoteMessage{vote}, peerID, tmtime.Now()})
select {
case <-cs.statsMsgQueue:

View File

@@ -129,8 +129,8 @@ func DefaultSynchronyParams() SynchronyParams {
// TODO(@wbanfield): Determine experimental values for these defaults
// https://github.com/tendermint/tendermint/issues/7202
return SynchronyParams{
Precision: 10 * time.Minute,
MessageDelay: 500 * time.Minute,
Precision: 10 * time.Millisecond,
MessageDelay: 500 * time.Millisecond,
}
}