Compare commits

...

3 Commits

Author SHA1 Message Date
William Banfield
cdedf5f43a add timestamp difference histogram 2022-01-26 18:15:58 -05:00
William Banfield
2aacab562a metrics: add metric for proposal timestamp difference (#7550) 2022-01-26 17:51:01 -05:00
William Banfield
4efc8a4485 add height for outlier message delays 2022-01-26 17:19:08 -05:00
4 changed files with 75 additions and 37 deletions

View File

@@ -76,6 +76,11 @@ type Metrics struct {
// timestamp and the timestamp of the latest prevote in a round where 100%
// of the voting power on the network issued prevotes.
FullPrevoteMessageDelay metrics.Gauge
// ProposalTimestampDifference is the difference between the timestamp in
// the proposal message and the local time of the validator at the time
// that the validator received the message.
ProposalTimestampDifference metrics.Histogram
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
@@ -208,13 +213,22 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "quorum_prevote_message_delay",
Help: "Difference in seconds between the proposal timestamp and the timestamp " +
"of the latest prevote that achieved a quorum in the prevote step.",
}, labels).With(labelsAndValues...),
}, append(labels, "height")).With(labelsAndValues...),
FullPrevoteMessageDelay: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "full_prevote_message_delay",
Help: "Difference in seconds between the proposal timestamp and the timestamp " +
"of the latest prevote that achieved 100% of the voting power in the prevote step.",
}, append(labels, "height")).With(labelsAndValues...),
ProposalTimestampDifference: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "proposal_timestamp_difference",
Help: "Difference in seconds between the timestamp in the proposal " +
"message and the local time when the message was received. " +
"Only calculated when a new block is proposed.",
Buckets: []float64{-10, -.5, -.025, 0, .1, .5, 1, 1.5, 2, 10},
}, labels).With(labelsAndValues...),
}
}
@@ -239,14 +253,15 @@ func NopMetrics() *Metrics {
BlockIntervalSeconds: discard.NewHistogram(),
NumTxs: discard.NewGauge(),
BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
CommittedHeight: discard.NewGauge(),
FastSyncing: discard.NewGauge(),
StateSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),
QuorumPrevoteMessageDelay: discard.NewGauge(),
FullPrevoteMessageDelay: discard.NewGauge(),
NumTxs: discard.NewGauge(),
BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
CommittedHeight: discard.NewGauge(),
FastSyncing: discard.NewGauge(),
StateSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),
QuorumPrevoteMessageDelay: discard.NewGauge(),
FullPrevoteMessageDelay: discard.NewGauge(),
ProposalTimestampDifference: discard.NewHistogram(),
}
}

View File

@@ -308,13 +308,13 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -334,7 +334,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
cs.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()}
default:
// don't punish (leave room for soft upgrades)

View File

@@ -44,8 +44,9 @@ var msgQueueSize = 1000
// msgs from the reactor which may update the state
type msgInfo struct {
Msg Message `json:"msg"`
PeerID p2p.ID `json:"peer_key"`
Msg Message `json:"msg"`
PeerID p2p.ID `json:"peer_key"`
ReceiveTime time.Time `json:"receive_time"`
}
// internally generated messages which may update the state
@@ -129,7 +130,7 @@ type State struct {
// some functions can be overwritten for testing
decideProposal func(height int64, round int32)
doPrevote func(height int64, round int32)
setProposal func(proposal *types.Proposal) error
setProposal func(proposal *types.Proposal, time time.Time) error
// closed when we finish shutting down
done chan struct{}
@@ -457,9 +458,9 @@ func (cs *State) OpenWAL(walFile string) (WAL, error) {
// AddVote inputs a vote.
func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, "", time.Now()}
} else {
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID}
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID, time.Now()}
}
// TODO: wait for event?!
@@ -470,9 +471,9 @@ func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error
func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, "", time.Now()}
} else {
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID}
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID, time.Now()}
}
// TODO: wait for event?!
@@ -483,9 +484,9 @@ func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
func (cs *State) AddProposalBlockPart(height int64, round int32, part *types.Part, peerID p2p.ID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, "", time.Now()}
} else {
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID}
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID, time.Now()}
}
// TODO: wait for event?!
@@ -662,6 +663,7 @@ func (cs *State) updateToState(state sm.State) {
cs.Validators = validators
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
cs.LockedRound = -1
@@ -814,7 +816,7 @@ func (cs *State) handleMsg(mi msgInfo) {
case *ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
err = cs.setProposal(msg.Proposal)
err = cs.setProposal(msg.Proposal, mi.ReceiveTime)
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
@@ -994,6 +996,7 @@ func (cs *State) enterNewRound(height int64, round int32) {
} else {
logger.Debug("resetting proposal info")
cs.Proposal = nil
cs.ProposalReceiveTime = time.Time{}
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
}
@@ -1134,11 +1137,11 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
proposal.Signature = p.Signature
// send proposal and block parts on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, "", time.Now()})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, "", time.Now()})
}
cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal)
@@ -1789,7 +1792,7 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
//-----------------------------------------------------------------------------
func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error {
// Already have one
// TODO: possibly catch double proposals
if cs.Proposal != nil {
@@ -1817,6 +1820,8 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
proposal.Signature = p.Signature
cs.Proposal = proposal
cs.ProposalReceiveTime = recvTime
cs.calculateProposalTimestampDifferenceMetric()
// We don't update cs.ProposalBlockParts if it is already set.
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round.
// TODO: We can check if Proposal is for a different block as this is a sign of misbehavior!
@@ -2227,7 +2232,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
// TODO: pass pubKey to signVote
vote, err := cs.signVote(msgType, hash, header)
if err == nil {
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, "", time.Now()})
cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
return vote
}
@@ -2294,12 +2299,22 @@ func (cs *State) calculatePrevoteMessageDelayMetrics() {
_, val := cs.Validators.GetByAddress(v.ValidatorAddress)
votingPowerSeen += val.VotingPower
if votingPowerSeen >= cs.Validators.TotalVotingPower()*2/3+1 {
cs.metrics.QuorumPrevoteMessageDelay.Set(v.Timestamp.Sub(cs.Proposal.Timestamp).Seconds())
delay := v.Timestamp.Sub(cs.Proposal.Timestamp)
if delay >= 10*time.Second {
cs.metrics.QuorumPrevoteMessageDelay.With("height", fmt.Sprintf("%d", cs.Height)).Set(delay.Seconds())
} else {
cs.metrics.QuorumPrevoteMessageDelay.With("height", "none").Set(delay.Seconds())
}
break
}
}
if ps.HasAll() {
cs.metrics.FullPrevoteMessageDelay.Set(pl[len(pl)-1].Timestamp.Sub(cs.Proposal.Timestamp).Seconds())
delay := pl[len(pl)-1].Timestamp.Sub(cs.Proposal.Timestamp)
if delay >= 10*time.Second {
cs.metrics.FullPrevoteMessageDelay.With("height", fmt.Sprintf("%d", cs.Height)).Set(delay.Seconds())
} else {
cs.metrics.FullPrevoteMessageDelay.With("height", "none").Set(delay.Seconds())
}
}
}
@@ -2359,3 +2374,10 @@ func repairWalFile(src, dst string) error {
return nil
}
func (cs *State) calculateProposalTimestampDifferenceMetric() {
if cs.Proposal != nil && cs.Proposal.POLRound == -1 {
cs.metrics.ProposalTimestampDifference.
Observe(cs.ProposalReceiveTime.Sub(cs.Proposal.Timestamp).Seconds())
}
}

View File

@@ -71,14 +71,15 @@ type RoundState struct {
StartTime time.Time `json:"start_time"`
// Subjective time when +2/3 precommits for Block at Round were found
CommitTime time.Time `json:"commit_time"`
Validators *types.ValidatorSet `json:"validators"`
Proposal *types.Proposal `json:"proposal"`
ProposalBlock *types.Block `json:"proposal_block"`
ProposalBlockParts *types.PartSet `json:"proposal_block_parts"`
LockedRound int32 `json:"locked_round"`
LockedBlock *types.Block `json:"locked_block"`
LockedBlockParts *types.PartSet `json:"locked_block_parts"`
CommitTime time.Time `json:"commit_time"`
Validators *types.ValidatorSet `json:"validators"`
Proposal *types.Proposal `json:"proposal"`
ProposalReceiveTime time.Time `json:"proposal_receive_time"`
ProposalBlock *types.Block `json:"proposal_block"`
ProposalBlockParts *types.PartSet `json:"proposal_block_parts"`
LockedRound int32 `json:"locked_round"`
LockedBlock *types.Block `json:"locked_block"`
LockedBlockParts *types.PartSet `json:"locked_block_parts"`
// Last known round with POL for non-nil valid block.
ValidRound int32 `json:"valid_round"`