add timestamp difference histogram

This commit is contained in:
William Banfield
2022-01-26 18:15:58 -05:00
parent 2aacab562a
commit cdedf5f43a
4 changed files with 31 additions and 33 deletions

View File

@@ -229,7 +229,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
"message and the local time when the message was received. " +
"Only calculated when a new block is proposed.",
Buckets: []float64{-10, -.5, -.025, 0, .1, .5, 1, 1.5, 2, 10},
}, append(labels, "is_timely")).With(labelsAndValues...),
}, labels).With(labelsAndValues...),
}
}

View File

@@ -308,13 +308,13 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -334,7 +334,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
cs.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()}
default:
// don't punish (leave room for soft upgrades)

View File

@@ -44,8 +44,9 @@ var msgQueueSize = 1000
// msgs from the reactor which may update the state
type msgInfo struct {
Msg Message `json:"msg"`
PeerID p2p.ID `json:"peer_key"`
Msg Message `json:"msg"`
PeerID p2p.ID `json:"peer_key"`
ReceiveTime time.Time `json:"receive_time"`
}
// internally generated messages which may update the state
@@ -129,7 +130,7 @@ type State struct {
// some functions can be overwritten for testing
decideProposal func(height int64, round int32)
doPrevote func(height int64, round int32)
setProposal func(proposal *types.Proposal) error
setProposal func(proposal *types.Proposal, time time.Time) error
// closed when we finish shutting down
done chan struct{}
@@ -457,9 +458,9 @@ func (cs *State) OpenWAL(walFile string) (WAL, error) {
// AddVote inputs a vote.
func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, "", time.Now()}
} else {
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID}
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID, time.Now()}
}
// TODO: wait for event?!
@@ -470,9 +471,9 @@ func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error
func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, "", time.Now()}
} else {
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID}
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID, time.Now()}
}
// TODO: wait for event?!
@@ -483,9 +484,9 @@ func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
func (cs *State) AddProposalBlockPart(height int64, round int32, part *types.Part, peerID p2p.ID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, "", time.Now()}
} else {
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID}
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID, time.Now()}
}
// TODO: wait for event?!
@@ -662,6 +663,7 @@ func (cs *State) updateToState(state sm.State) {
cs.Validators = validators
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
cs.LockedRound = -1
@@ -814,7 +816,7 @@ func (cs *State) handleMsg(mi msgInfo) {
case *ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
err = cs.setProposal(msg.Proposal)
err = cs.setProposal(msg.Proposal, mi.ReceiveTime)
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
@@ -994,6 +996,7 @@ func (cs *State) enterNewRound(height int64, round int32) {
} else {
logger.Debug("resetting proposal info")
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
}
@@ -1134,11 +1137,11 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
proposal.Signature = p.Signature
// send proposal and block parts on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, "", time.Now()})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, "", time.Now()})
}
cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal)
@@ -1789,7 +1792,7 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
//-----------------------------------------------------------------------------
func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error {
// Already have one
// TODO: possibly catch double proposals
if cs.Proposal != nil {
@@ -2229,7 +2232,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
// TODO: pass pubKey to signVote
vote, err := cs.signVote(msgType, hash, header)
if err == nil {
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, "", time.Now()})
cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
return vote
}
@@ -2374,13 +2377,7 @@ func repairWalFile(src, dst string) error {
func (cs *State) calculateProposalTimestampDifferenceMetric() {
if cs.Proposal != nil && cs.Proposal.POLRound == -1 {
tp := types.SynchronyParams{
Precision: cs.state.ConsensusParams.Synchrony.Precision,
MessageDelay: cs.state.ConsensusParams.Synchrony.MessageDelay,
}
isTimely := cs.Proposal.IsTimely(cs.ProposalReceiveTime, tp, cs.state.InitialHeight)
cs.metrics.ProposalTimestampDifference.With("is_timely", fmt.Sprintf("%t", isTimely)).
cs.metrics.ProposalTimestampDifference.
Observe(cs.ProposalReceiveTime.Sub(cs.Proposal.Timestamp).Seconds())
}
}

View File

@@ -71,14 +71,15 @@ type RoundState struct {
StartTime time.Time `json:"start_time"`
// Subjective time when +2/3 precommits for Block at Round were found
CommitTime time.Time `json:"commit_time"`
Validators *types.ValidatorSet `json:"validators"`
Proposal *types.Proposal `json:"proposal"`
ProposalBlock *types.Block `json:"proposal_block"`
ProposalBlockParts *types.PartSet `json:"proposal_block_parts"`
LockedRound int32 `json:"locked_round"`
LockedBlock *types.Block `json:"locked_block"`
LockedBlockParts *types.PartSet `json:"locked_block_parts"`
CommitTime time.Time `json:"commit_time"`
Validators *types.ValidatorSet `json:"validators"`
Proposal *types.Proposal `json:"proposal"`
ProposalReceiveTime time.Time `json:"proposal_receive_time"`
ProposalBlock *types.Block `json:"proposal_block"`
ProposalBlockParts *types.PartSet `json:"proposal_block_parts"`
LockedRound int32 `json:"locked_round"`
LockedBlock *types.Block `json:"locked_block"`
LockedBlockParts *types.PartSet `json:"locked_block_parts"`
// Last known round with POL for non-nil valid block.
ValidRound int32 `json:"valid_round"`