mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-13 07:11:13 +00:00
Compare commits
3 Commits
thane/v0.3
...
wb/pbts-me
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cdedf5f43a | ||
|
|
2aacab562a | ||
|
|
4efc8a4485 |
@@ -76,6 +76,11 @@ type Metrics struct {
|
||||
// timestamp and the timestamp of the latest prevote in a round where 100%
|
||||
// of the voting power on the network issued prevotes.
|
||||
FullPrevoteMessageDelay metrics.Gauge
|
||||
|
||||
// ProposalTimestampDifference is the difference between the timestamp in
|
||||
// the proposal message and the local time of the validator at the time
|
||||
// that the validator received the message.
|
||||
ProposalTimestampDifference metrics.Histogram
|
||||
}
|
||||
|
||||
// PrometheusMetrics returns Metrics build using Prometheus client library.
|
||||
@@ -208,13 +213,22 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
Name: "quorum_prevote_message_delay",
|
||||
Help: "Difference in seconds between the proposal timestamp and the timestamp " +
|
||||
"of the latest prevote that achieved a quorum in the prevote step.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
}, append(labels, "height")).With(labelsAndValues...),
|
||||
FullPrevoteMessageDelay: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "full_prevote_message_delay",
|
||||
Help: "Difference in seconds between the proposal timestamp and the timestamp " +
|
||||
"of the latest prevote that achieved 100% of the voting power in the prevote step.",
|
||||
}, append(labels, "height")).With(labelsAndValues...),
|
||||
ProposalTimestampDifference: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "proposal_timestamp_difference",
|
||||
Help: "Difference in seconds between the timestamp in the proposal " +
|
||||
"message and the local time when the message was received. " +
|
||||
"Only calculated when a new block is proposed.",
|
||||
Buckets: []float64{-10, -.5, -.025, 0, .1, .5, 1, 1.5, 2, 10},
|
||||
}, labels).With(labelsAndValues...),
|
||||
}
|
||||
}
|
||||
@@ -239,14 +253,15 @@ func NopMetrics() *Metrics {
|
||||
|
||||
BlockIntervalSeconds: discard.NewHistogram(),
|
||||
|
||||
NumTxs: discard.NewGauge(),
|
||||
BlockSizeBytes: discard.NewGauge(),
|
||||
TotalTxs: discard.NewGauge(),
|
||||
CommittedHeight: discard.NewGauge(),
|
||||
FastSyncing: discard.NewGauge(),
|
||||
StateSyncing: discard.NewGauge(),
|
||||
BlockParts: discard.NewCounter(),
|
||||
QuorumPrevoteMessageDelay: discard.NewGauge(),
|
||||
FullPrevoteMessageDelay: discard.NewGauge(),
|
||||
NumTxs: discard.NewGauge(),
|
||||
BlockSizeBytes: discard.NewGauge(),
|
||||
TotalTxs: discard.NewGauge(),
|
||||
CommittedHeight: discard.NewGauge(),
|
||||
FastSyncing: discard.NewGauge(),
|
||||
StateSyncing: discard.NewGauge(),
|
||||
BlockParts: discard.NewCounter(),
|
||||
QuorumPrevoteMessageDelay: discard.NewGauge(),
|
||||
FullPrevoteMessageDelay: discard.NewGauge(),
|
||||
ProposalTimestampDifference: discard.NewHistogram(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -308,13 +308,13 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
switch msg := msg.(type) {
|
||||
case *ProposalMessage:
|
||||
ps.SetHasProposal(msg.Proposal)
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()}
|
||||
case *ProposalPOLMessage:
|
||||
ps.ApplyProposalPOLMessage(msg)
|
||||
case *BlockPartMessage:
|
||||
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
|
||||
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()}
|
||||
default:
|
||||
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
|
||||
}
|
||||
@@ -334,7 +334,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
|
||||
ps.SetHasVote(msg.Vote)
|
||||
|
||||
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||
cs.peerMsgQueue <- msgInfo{msg, src.ID(), time.Now()}
|
||||
|
||||
default:
|
||||
// don't punish (leave room for soft upgrades)
|
||||
|
||||
@@ -44,8 +44,9 @@ var msgQueueSize = 1000
|
||||
|
||||
// msgs from the reactor which may update the state
|
||||
type msgInfo struct {
|
||||
Msg Message `json:"msg"`
|
||||
PeerID p2p.ID `json:"peer_key"`
|
||||
Msg Message `json:"msg"`
|
||||
PeerID p2p.ID `json:"peer_key"`
|
||||
ReceiveTime time.Time `json:"receive_time"`
|
||||
}
|
||||
|
||||
// internally generated messages which may update the state
|
||||
@@ -129,7 +130,7 @@ type State struct {
|
||||
// some functions can be overwritten for testing
|
||||
decideProposal func(height int64, round int32)
|
||||
doPrevote func(height int64, round int32)
|
||||
setProposal func(proposal *types.Proposal) error
|
||||
setProposal func(proposal *types.Proposal, time time.Time) error
|
||||
|
||||
// closed when we finish shutting down
|
||||
done chan struct{}
|
||||
@@ -457,9 +458,9 @@ func (cs *State) OpenWAL(walFile string) (WAL, error) {
|
||||
// AddVote inputs a vote.
|
||||
func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
|
||||
if peerID == "" {
|
||||
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}
|
||||
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, "", time.Now()}
|
||||
} else {
|
||||
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID}
|
||||
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID, time.Now()}
|
||||
}
|
||||
|
||||
// TODO: wait for event?!
|
||||
@@ -470,9 +471,9 @@ func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error
|
||||
func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
|
||||
|
||||
if peerID == "" {
|
||||
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
|
||||
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, "", time.Now()}
|
||||
} else {
|
||||
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID}
|
||||
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID, time.Now()}
|
||||
}
|
||||
|
||||
// TODO: wait for event?!
|
||||
@@ -483,9 +484,9 @@ func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
|
||||
func (cs *State) AddProposalBlockPart(height int64, round int32, part *types.Part, peerID p2p.ID) error {
|
||||
|
||||
if peerID == "" {
|
||||
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}
|
||||
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, "", time.Now()}
|
||||
} else {
|
||||
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID}
|
||||
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID, time.Now()}
|
||||
}
|
||||
|
||||
// TODO: wait for event?!
|
||||
@@ -662,6 +663,7 @@ func (cs *State) updateToState(state sm.State) {
|
||||
|
||||
cs.Validators = validators
|
||||
cs.Proposal = nil
|
||||
cs.ProposalReceiveTime = time.Time{}
|
||||
cs.ProposalBlock = nil
|
||||
cs.ProposalBlockParts = nil
|
||||
cs.LockedRound = -1
|
||||
@@ -814,7 +816,7 @@ func (cs *State) handleMsg(mi msgInfo) {
|
||||
case *ProposalMessage:
|
||||
// will not cause transition.
|
||||
// once proposal is set, we can receive block parts
|
||||
err = cs.setProposal(msg.Proposal)
|
||||
err = cs.setProposal(msg.Proposal, mi.ReceiveTime)
|
||||
|
||||
case *BlockPartMessage:
|
||||
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
|
||||
@@ -994,6 +996,7 @@ func (cs *State) enterNewRound(height int64, round int32) {
|
||||
} else {
|
||||
logger.Debug("resetting proposal info")
|
||||
cs.Proposal = nil
|
||||
cs.ProposalReceiveTime = time.Time{}
|
||||
cs.ProposalBlock = nil
|
||||
cs.ProposalBlockParts = nil
|
||||
}
|
||||
@@ -1134,11 +1137,11 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
|
||||
proposal.Signature = p.Signature
|
||||
|
||||
// send proposal and block parts on internal msg queue
|
||||
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
|
||||
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, "", time.Now()})
|
||||
|
||||
for i := 0; i < int(blockParts.Total()); i++ {
|
||||
part := blockParts.GetPart(i)
|
||||
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
|
||||
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, "", time.Now()})
|
||||
}
|
||||
|
||||
cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal)
|
||||
@@ -1789,7 +1792,7 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
|
||||
func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error {
|
||||
// Already have one
|
||||
// TODO: possibly catch double proposals
|
||||
if cs.Proposal != nil {
|
||||
@@ -1817,6 +1820,8 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
|
||||
|
||||
proposal.Signature = p.Signature
|
||||
cs.Proposal = proposal
|
||||
cs.ProposalReceiveTime = recvTime
|
||||
cs.calculateProposalTimestampDifferenceMetric()
|
||||
// We don't update cs.ProposalBlockParts if it is already set.
|
||||
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round.
|
||||
// TODO: We can check if Proposal is for a different block as this is a sign of misbehavior!
|
||||
@@ -2227,7 +2232,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
|
||||
// TODO: pass pubKey to signVote
|
||||
vote, err := cs.signVote(msgType, hash, header)
|
||||
if err == nil {
|
||||
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
|
||||
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, "", time.Now()})
|
||||
cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
|
||||
return vote
|
||||
}
|
||||
@@ -2294,12 +2299,22 @@ func (cs *State) calculatePrevoteMessageDelayMetrics() {
|
||||
_, val := cs.Validators.GetByAddress(v.ValidatorAddress)
|
||||
votingPowerSeen += val.VotingPower
|
||||
if votingPowerSeen >= cs.Validators.TotalVotingPower()*2/3+1 {
|
||||
cs.metrics.QuorumPrevoteMessageDelay.Set(v.Timestamp.Sub(cs.Proposal.Timestamp).Seconds())
|
||||
delay := v.Timestamp.Sub(cs.Proposal.Timestamp)
|
||||
if delay >= 10*time.Second {
|
||||
cs.metrics.QuorumPrevoteMessageDelay.With("height", fmt.Sprintf("%d", cs.Height)).Set(delay.Seconds())
|
||||
} else {
|
||||
cs.metrics.QuorumPrevoteMessageDelay.With("height", "none").Set(delay.Seconds())
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if ps.HasAll() {
|
||||
cs.metrics.FullPrevoteMessageDelay.Set(pl[len(pl)-1].Timestamp.Sub(cs.Proposal.Timestamp).Seconds())
|
||||
delay := pl[len(pl)-1].Timestamp.Sub(cs.Proposal.Timestamp)
|
||||
if delay >= 10*time.Second {
|
||||
cs.metrics.FullPrevoteMessageDelay.With("height", fmt.Sprintf("%d", cs.Height)).Set(delay.Seconds())
|
||||
} else {
|
||||
cs.metrics.FullPrevoteMessageDelay.With("height", "none").Set(delay.Seconds())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2359,3 +2374,10 @@ func repairWalFile(src, dst string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *State) calculateProposalTimestampDifferenceMetric() {
|
||||
if cs.Proposal != nil && cs.Proposal.POLRound == -1 {
|
||||
cs.metrics.ProposalTimestampDifference.
|
||||
Observe(cs.ProposalReceiveTime.Sub(cs.Proposal.Timestamp).Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,14 +71,15 @@ type RoundState struct {
|
||||
StartTime time.Time `json:"start_time"`
|
||||
|
||||
// Subjective time when +2/3 precommits for Block at Round were found
|
||||
CommitTime time.Time `json:"commit_time"`
|
||||
Validators *types.ValidatorSet `json:"validators"`
|
||||
Proposal *types.Proposal `json:"proposal"`
|
||||
ProposalBlock *types.Block `json:"proposal_block"`
|
||||
ProposalBlockParts *types.PartSet `json:"proposal_block_parts"`
|
||||
LockedRound int32 `json:"locked_round"`
|
||||
LockedBlock *types.Block `json:"locked_block"`
|
||||
LockedBlockParts *types.PartSet `json:"locked_block_parts"`
|
||||
CommitTime time.Time `json:"commit_time"`
|
||||
Validators *types.ValidatorSet `json:"validators"`
|
||||
Proposal *types.Proposal `json:"proposal"`
|
||||
ProposalReceiveTime time.Time `json:"proposal_receive_time"`
|
||||
ProposalBlock *types.Block `json:"proposal_block"`
|
||||
ProposalBlockParts *types.PartSet `json:"proposal_block_parts"`
|
||||
LockedRound int32 `json:"locked_round"`
|
||||
LockedBlock *types.Block `json:"locked_block"`
|
||||
LockedBlockParts *types.PartSet `json:"locked_block_parts"`
|
||||
|
||||
// Last known round with POL for non-nil valid block.
|
||||
ValidRound int32 `json:"valid_round"`
|
||||
|
||||
Reference in New Issue
Block a user