diff --git a/consensus/metrics.go b/consensus/metrics.go index a92c949ef..4f7df530e 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -229,7 +229,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { "message and the local time when the message was received. " + "Only calculated when a new block is proposed.", Buckets: []float64{-10, -.5, -.025, 0, .1, .5, 1, 1.5, 2, 10}, - }, append(labels, "is_timely")).With(labelsAndValues...), + }, labels).With(labelsAndValues...), } } diff --git a/consensus/reactor.go b/consensus/reactor.go index 0a7c590be..cc54367bb 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -308,13 +308,13 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { switch msg := msg.(type) { case *ProposalMessage: ps.SetHasProposal(msg.Proposal) - conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} + conR.conS.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()} case *ProposalPOLMessage: ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1) - conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} + conR.conS.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()} default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } @@ -334,7 +334,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) - cs.peerMsgQueue <- msgInfo{msg, src.ID()} + cs.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()} default: // don't punish (leave room for soft upgrades) diff --git a/consensus/state.go b/consensus/state.go index 358e78606..826c4bea8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -44,8 +44,9 @@ var msgQueueSize = 1000 // msgs from the reactor which may update the state type msgInfo struct { - Msg Message `json:"msg"` - PeerID p2p.ID `json:"peer_key"` + Msg Message `json:"msg"` + PeerID p2p.ID `json:"peer_key"` + ReceiveTime time.Time `json:"receive_time"` } // internally generated messages which may update the state @@ -129,7 +130,7 @@ type State struct { // some functions can be overwritten for testing decideProposal func(height int64, round int32) doPrevote func(height int64, round int32) - setProposal func(proposal *types.Proposal) error + setProposal func(proposal *types.Proposal, time time.Time) error // closed when we finish shutting down done chan struct{} @@ -457,9 +458,9 @@ func (cs *State) OpenWAL(walFile string) (WAL, error) { // AddVote inputs a vote. func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) { if peerID == "" { - cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""} + cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, "", time.Now()} } else { - cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID} + cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID, time.Now()} } // TODO: wait for event?! @@ -470,9 +471,9 @@ func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error { if peerID == "" { - cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""} + cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, "", time.Now()} } else { - cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID} + cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID, time.Now()} } // TODO: wait for event?! @@ -483,9 +484,9 @@ func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error { func (cs *State) AddProposalBlockPart(height int64, round int32, part *types.Part, peerID p2p.ID) error { if peerID == "" { - cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""} + cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, "", time.Now()} } else { - cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID} + cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID, time.Now()} } // TODO: wait for event?! @@ -662,6 +663,7 @@ func (cs *State) updateToState(state sm.State) { cs.Validators = validators cs.Proposal = nil + cs.ProposalReceiveTime = time.Time{} cs.ProposalBlock = nil cs.ProposalBlockParts = nil cs.LockedRound = -1 @@ -814,7 +816,7 @@ func (cs *State) handleMsg(mi msgInfo) { case *ProposalMessage: // will not cause transition. // once proposal is set, we can receive block parts - err = cs.setProposal(msg.Proposal) + err = cs.setProposal(msg.Proposal, mi.ReceiveTime) case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit @@ -994,6 +996,7 @@ func (cs *State) enterNewRound(height int64, round int32) { } else { logger.Debug("resetting proposal info") cs.Proposal = nil + cs.ProposalReceiveTime = time.Time{} cs.ProposalBlock = nil cs.ProposalBlockParts = nil } @@ -1134,11 +1137,11 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { proposal.Signature = p.Signature // send proposal and block parts on internal msg queue - cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""}) + cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, "", time.Now()}) for i := 0; i < int(blockParts.Total()); i++ { part := blockParts.GetPart(i) - cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) + cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, "", time.Now()}) } cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal) @@ -1789,7 +1792,7 @@ func (cs *State) recordMetrics(height int64, block *types.Block) { //----------------------------------------------------------------------------- -func (cs *State) defaultSetProposal(proposal *types.Proposal) error { +func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error { // Already have one // TODO: possibly catch double proposals if cs.Proposal != nil { @@ -2229,7 +2232,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header // TODO: pass pubKey to signVote vote, err := cs.signVote(msgType, hash, header) if err == nil { - cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""}) + cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, "", time.Now()}) cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote) return vote } @@ -2374,13 +2377,7 @@ func repairWalFile(src, dst string) error { func (cs *State) calculateProposalTimestampDifferenceMetric() { if cs.Proposal != nil && cs.Proposal.POLRound == -1 { - tp := types.SynchronyParams{ - Precision: cs.state.ConsensusParams.Synchrony.Precision, - MessageDelay: cs.state.ConsensusParams.Synchrony.MessageDelay, - } - - isTimely := cs.Proposal.IsTimely(cs.ProposalReceiveTime, tp, cs.state.InitialHeight) - cs.metrics.ProposalTimestampDifference.With("is_timely", fmt.Sprintf("%t", isTimely)). + cs.metrics.ProposalTimestampDifference. Observe(cs.ProposalReceiveTime.Sub(cs.Proposal.Timestamp).Seconds()) } } diff --git a/consensus/types/round_state.go b/consensus/types/round_state.go index 9e67b76c0..190f8576f 100644 --- a/consensus/types/round_state.go +++ b/consensus/types/round_state.go @@ -71,14 +71,15 @@ type RoundState struct { StartTime time.Time `json:"start_time"` // Subjective time when +2/3 precommits for Block at Round were found - CommitTime time.Time `json:"commit_time"` - Validators *types.ValidatorSet `json:"validators"` - Proposal *types.Proposal `json:"proposal"` - ProposalBlock *types.Block `json:"proposal_block"` - ProposalBlockParts *types.PartSet `json:"proposal_block_parts"` - LockedRound int32 `json:"locked_round"` - LockedBlock *types.Block `json:"locked_block"` - LockedBlockParts *types.PartSet `json:"locked_block_parts"` + CommitTime time.Time `json:"commit_time"` + Validators *types.ValidatorSet `json:"validators"` + Proposal *types.Proposal `json:"proposal"` + ProposalReceiveTime time.Time `json:"proposal_receive_time"` + ProposalBlock *types.Block `json:"proposal_block"` + ProposalBlockParts *types.PartSet `json:"proposal_block_parts"` + LockedRound int32 `json:"locked_round"` + LockedBlock *types.Block `json:"locked_block"` + LockedBlockParts *types.PartSet `json:"locked_block_parts"` // Last known round with POL for non-nil valid block. ValidRound int32 `json:"valid_round"`