mirror of
https://github.com/tendermint/tendermint.git
synced 2025-12-23 14:25:19 +00:00
maverick: reduce some duplication (#6052)
- Reduce duplication in messages and metrics. - merge WAL interfaces. Meant to push the developer to make changes in both places.
This commit is contained in:
@@ -1,220 +0,0 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/go-kit/kit/metrics/discard"
|
||||
|
||||
prometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
|
||||
// package.
|
||||
MetricsSubsystem = "consensus"
|
||||
)
|
||||
|
||||
// Metrics contains metrics exposed by this package.
|
||||
type Metrics struct {
|
||||
// Height of the chain.
|
||||
Height metrics.Gauge
|
||||
|
||||
// ValidatorLastSignedHeight of a validator.
|
||||
ValidatorLastSignedHeight metrics.Gauge
|
||||
|
||||
// Number of rounds.
|
||||
Rounds metrics.Gauge
|
||||
|
||||
// Number of validators.
|
||||
Validators metrics.Gauge
|
||||
// Total power of all validators.
|
||||
ValidatorsPower metrics.Gauge
|
||||
// Power of a validator.
|
||||
ValidatorPower metrics.Gauge
|
||||
// Amount of blocks missed by a validator.
|
||||
ValidatorMissedBlocks metrics.Gauge
|
||||
// Number of validators who did not sign.
|
||||
MissingValidators metrics.Gauge
|
||||
// Total power of the missing validators.
|
||||
MissingValidatorsPower metrics.Gauge
|
||||
// Number of validators who tried to double sign.
|
||||
ByzantineValidators metrics.Gauge
|
||||
// Total power of the byzantine validators.
|
||||
ByzantineValidatorsPower metrics.Gauge
|
||||
|
||||
// Time between this and the last block.
|
||||
BlockIntervalSeconds metrics.Histogram
|
||||
|
||||
// Number of transactions.
|
||||
NumTxs metrics.Gauge
|
||||
// Size of the block.
|
||||
BlockSizeBytes metrics.Gauge
|
||||
// Total number of transactions.
|
||||
TotalTxs metrics.Gauge
|
||||
// The latest block height.
|
||||
CommittedHeight metrics.Gauge
|
||||
// Whether or not a node is fast syncing. 1 if yes, 0 if no.
|
||||
FastSyncing metrics.Gauge
|
||||
// Whether or not a node is state syncing. 1 if yes, 0 if no.
|
||||
StateSyncing metrics.Gauge
|
||||
|
||||
// Number of blockparts transmitted by peer.
|
||||
BlockParts metrics.Counter
|
||||
}
|
||||
|
||||
// PrometheusMetrics returns Metrics build using Prometheus client library.
|
||||
// Optionally, labels can be provided along with their values ("foo",
|
||||
// "fooValue").
|
||||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
labels := []string{}
|
||||
for i := 0; i < len(labelsAndValues); i += 2 {
|
||||
labels = append(labels, labelsAndValues[i])
|
||||
}
|
||||
return &Metrics{
|
||||
Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "height",
|
||||
Help: "Height of the chain.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "rounds",
|
||||
Help: "Number of rounds.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
|
||||
Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validators",
|
||||
Help: "Number of validators.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validator_last_signed_height",
|
||||
Help: "Last signed height for a validator",
|
||||
}, append(labels, "validator_address")).With(labelsAndValues...),
|
||||
ValidatorMissedBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validator_missed_blocks",
|
||||
Help: "Total missed blocks for a validator",
|
||||
}, append(labels, "validator_address")).With(labelsAndValues...),
|
||||
ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validators_power",
|
||||
Help: "Total power of all validators.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ValidatorPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validator_power",
|
||||
Help: "Power of a validator",
|
||||
}, append(labels, "validator_address")).With(labelsAndValues...),
|
||||
MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "missing_validators",
|
||||
Help: "Number of validators who did not sign.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "missing_validators_power",
|
||||
Help: "Total power of the missing validators.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "byzantine_validators",
|
||||
Help: "Number of validators who tried to double sign.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "byzantine_validators_power",
|
||||
Help: "Total power of the byzantine validators.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "block_interval_seconds",
|
||||
Help: "Time between this and the last block.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "num_txs",
|
||||
Help: "Number of transactions.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "block_size_bytes",
|
||||
Help: "Size of the block.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "total_txs",
|
||||
Help: "Total number of transactions.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
CommittedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "latest_block_height",
|
||||
Help: "The latest block height.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
FastSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "fast_syncing",
|
||||
Help: "Whether or not a node is fast syncing. 1 if yes, 0 if no.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "state_syncing",
|
||||
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "block_parts",
|
||||
Help: "Number of blockparts transmitted by peer.",
|
||||
}, append(labels, "peer_id")).With(labelsAndValues...),
|
||||
}
|
||||
}
|
||||
|
||||
// NopMetrics returns no-op Metrics.
|
||||
func NopMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
Height: discard.NewGauge(),
|
||||
|
||||
ValidatorLastSignedHeight: discard.NewGauge(),
|
||||
|
||||
Rounds: discard.NewGauge(),
|
||||
|
||||
Validators: discard.NewGauge(),
|
||||
ValidatorsPower: discard.NewGauge(),
|
||||
ValidatorPower: discard.NewGauge(),
|
||||
ValidatorMissedBlocks: discard.NewGauge(),
|
||||
MissingValidators: discard.NewGauge(),
|
||||
MissingValidatorsPower: discard.NewGauge(),
|
||||
ByzantineValidators: discard.NewGauge(),
|
||||
ByzantineValidatorsPower: discard.NewGauge(),
|
||||
|
||||
BlockIntervalSeconds: discard.NewHistogram(),
|
||||
|
||||
NumTxs: discard.NewGauge(),
|
||||
BlockSizeBytes: discard.NewGauge(),
|
||||
TotalTxs: discard.NewGauge(),
|
||||
CommittedHeight: discard.NewGauge(),
|
||||
FastSyncing: discard.NewGauge(),
|
||||
StateSyncing: discard.NewGauge(),
|
||||
BlockParts: discard.NewCounter(),
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package consensus
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
tmcon "github.com/tendermint/tendermint/consensus"
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -89,16 +90,16 @@ func DoublePrevoteMisbehavior() Misbehavior {
|
||||
}
|
||||
|
||||
// add our own vote
|
||||
cs.sendInternalMessage(msgInfo{&VoteMessage{prevote}, ""})
|
||||
cs.sendInternalMessage(msgInfo{&tmcon.VoteMessage{Vote: prevote}, ""})
|
||||
|
||||
cs.Logger.Info("Sending conflicting votes")
|
||||
peers := cs.sw.Peers().List()
|
||||
// there has to be at least two other peers connected else this behavior works normally
|
||||
for idx, peer := range peers {
|
||||
if idx%2 == 0 { // sign the proposal block
|
||||
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
|
||||
peer.Send(VoteChannel, tmcon.MustEncode(&tmcon.VoteMessage{Vote: prevote}))
|
||||
} else { // sign a nil block
|
||||
peer.Send(VoteChannel, MustEncode(&VoteMessage{nilPrevote}))
|
||||
peer.Send(VoteChannel, tmcon.MustEncode(&tmcon.VoteMessage{Vote: nilPrevote}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,10 +4,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
||||
tmcon "github.com/tendermint/tendermint/consensus"
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/libs/bits"
|
||||
tmmath "github.com/tendermint/tendermint/libs/math"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
|
||||
@@ -15,267 +13,7 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// MsgToProto takes a consensus message type and returns the proto defined consensus message
|
||||
func MsgToProto(msg Message) (*tmcons.Message, error) {
|
||||
if msg == nil {
|
||||
return nil, errors.New("consensus: message is nil")
|
||||
}
|
||||
var pb tmcons.Message
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *NewRoundStepMessage:
|
||||
pb = tmcons.Message{
|
||||
Sum: &tmcons.Message_NewRoundStep{
|
||||
NewRoundStep: &tmcons.NewRoundStep{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
Step: uint32(msg.Step),
|
||||
SecondsSinceStartTime: msg.SecondsSinceStartTime,
|
||||
LastCommitRound: msg.LastCommitRound,
|
||||
},
|
||||
},
|
||||
}
|
||||
case *NewValidBlockMessage:
|
||||
pbPartSetHeader := msg.BlockPartSetHeader.ToProto()
|
||||
pbBits := msg.BlockParts.ToProto()
|
||||
pb = tmcons.Message{
|
||||
Sum: &tmcons.Message_NewValidBlock{
|
||||
NewValidBlock: &tmcons.NewValidBlock{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
BlockPartSetHeader: pbPartSetHeader,
|
||||
BlockParts: pbBits,
|
||||
IsCommit: msg.IsCommit,
|
||||
},
|
||||
},
|
||||
}
|
||||
case *ProposalMessage:
|
||||
pbP := msg.Proposal.ToProto()
|
||||
pb = tmcons.Message{
|
||||
Sum: &tmcons.Message_Proposal{
|
||||
Proposal: &tmcons.Proposal{
|
||||
Proposal: *pbP,
|
||||
},
|
||||
},
|
||||
}
|
||||
case *ProposalPOLMessage:
|
||||
pbBits := msg.ProposalPOL.ToProto()
|
||||
pb = tmcons.Message{
|
||||
Sum: &tmcons.Message_ProposalPol{
|
||||
ProposalPol: &tmcons.ProposalPOL{
|
||||
Height: msg.Height,
|
||||
ProposalPolRound: msg.ProposalPOLRound,
|
||||
ProposalPol: *pbBits,
|
||||
},
|
||||
},
|
||||
}
|
||||
case *BlockPartMessage:
|
||||
parts, err := msg.Part.ToProto()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("msg to proto error: %w", err)
|
||||
}
|
||||
pb = tmcons.Message{
|
||||
Sum: &tmcons.Message_BlockPart{
|
||||
BlockPart: &tmcons.BlockPart{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
Part: *parts,
|
||||
},
|
||||
},
|
||||
}
|
||||
case *VoteMessage:
|
||||
vote := msg.Vote.ToProto()
|
||||
pb = tmcons.Message{
|
||||
Sum: &tmcons.Message_Vote{
|
||||
Vote: &tmcons.Vote{
|
||||
Vote: vote,
|
||||
},
|
||||
},
|
||||
}
|
||||
case *HasVoteMessage:
|
||||
pb = tmcons.Message{
|
||||
Sum: &tmcons.Message_HasVote{
|
||||
HasVote: &tmcons.HasVote{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
Type: msg.Type,
|
||||
Index: msg.Index,
|
||||
},
|
||||
},
|
||||
}
|
||||
case *VoteSetMaj23Message:
|
||||
bi := msg.BlockID.ToProto()
|
||||
pb = tmcons.Message{
|
||||
Sum: &tmcons.Message_VoteSetMaj23{
|
||||
VoteSetMaj23: &tmcons.VoteSetMaj23{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
Type: msg.Type,
|
||||
BlockID: bi,
|
||||
},
|
||||
},
|
||||
}
|
||||
case *VoteSetBitsMessage:
|
||||
bi := msg.BlockID.ToProto()
|
||||
bits := msg.Votes.ToProto()
|
||||
|
||||
vsb := &tmcons.Message_VoteSetBits{
|
||||
VoteSetBits: &tmcons.VoteSetBits{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
Type: msg.Type,
|
||||
BlockID: bi,
|
||||
},
|
||||
}
|
||||
|
||||
if bits != nil {
|
||||
vsb.VoteSetBits.Votes = *bits
|
||||
}
|
||||
|
||||
pb = tmcons.Message{
|
||||
Sum: vsb,
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("consensus: message not recognized: %T", msg)
|
||||
}
|
||||
|
||||
return &pb, nil
|
||||
}
|
||||
|
||||
// MsgFromProto takes a consensus proto message and returns the native go type
|
||||
func MsgFromProto(msg *tmcons.Message) (Message, error) {
|
||||
if msg == nil {
|
||||
return nil, errors.New("consensus: nil message")
|
||||
}
|
||||
var pb Message
|
||||
|
||||
switch msg := msg.Sum.(type) {
|
||||
case *tmcons.Message_NewRoundStep:
|
||||
rs, err := tmmath.SafeConvertUint8(int64(msg.NewRoundStep.Step))
|
||||
// deny message based on possible overflow
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("denying message due to possible overflow: %w", err)
|
||||
}
|
||||
pb = &NewRoundStepMessage{
|
||||
Height: msg.NewRoundStep.Height,
|
||||
Round: msg.NewRoundStep.Round,
|
||||
Step: cstypes.RoundStepType(rs),
|
||||
SecondsSinceStartTime: msg.NewRoundStep.SecondsSinceStartTime,
|
||||
LastCommitRound: msg.NewRoundStep.LastCommitRound,
|
||||
}
|
||||
case *tmcons.Message_NewValidBlock:
|
||||
pbPartSetHeader, err := types.PartSetHeaderFromProto(&msg.NewValidBlock.BlockPartSetHeader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parts to proto error: %w", err)
|
||||
}
|
||||
|
||||
pbBits := new(bits.BitArray)
|
||||
pbBits.FromProto(msg.NewValidBlock.BlockParts)
|
||||
|
||||
pb = &NewValidBlockMessage{
|
||||
Height: msg.NewValidBlock.Height,
|
||||
Round: msg.NewValidBlock.Round,
|
||||
BlockPartSetHeader: *pbPartSetHeader,
|
||||
BlockParts: pbBits,
|
||||
IsCommit: msg.NewValidBlock.IsCommit,
|
||||
}
|
||||
case *tmcons.Message_Proposal:
|
||||
pbP, err := types.ProposalFromProto(&msg.Proposal.Proposal)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("proposal msg to proto error: %w", err)
|
||||
}
|
||||
|
||||
pb = &ProposalMessage{
|
||||
Proposal: pbP,
|
||||
}
|
||||
case *tmcons.Message_ProposalPol:
|
||||
pbBits := new(bits.BitArray)
|
||||
pbBits.FromProto(&msg.ProposalPol.ProposalPol)
|
||||
pb = &ProposalPOLMessage{
|
||||
Height: msg.ProposalPol.Height,
|
||||
ProposalPOLRound: msg.ProposalPol.ProposalPolRound,
|
||||
ProposalPOL: pbBits,
|
||||
}
|
||||
case *tmcons.Message_BlockPart:
|
||||
parts, err := types.PartFromProto(&msg.BlockPart.Part)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("blockpart msg to proto error: %w", err)
|
||||
}
|
||||
pb = &BlockPartMessage{
|
||||
Height: msg.BlockPart.Height,
|
||||
Round: msg.BlockPart.Round,
|
||||
Part: parts,
|
||||
}
|
||||
case *tmcons.Message_Vote:
|
||||
vote, err := types.VoteFromProto(msg.Vote.Vote)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("vote msg to proto error: %w", err)
|
||||
}
|
||||
|
||||
pb = &VoteMessage{
|
||||
Vote: vote,
|
||||
}
|
||||
case *tmcons.Message_HasVote:
|
||||
pb = &HasVoteMessage{
|
||||
Height: msg.HasVote.Height,
|
||||
Round: msg.HasVote.Round,
|
||||
Type: msg.HasVote.Type,
|
||||
Index: msg.HasVote.Index,
|
||||
}
|
||||
case *tmcons.Message_VoteSetMaj23:
|
||||
bi, err := types.BlockIDFromProto(&msg.VoteSetMaj23.BlockID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("voteSetMaj23 msg to proto error: %w", err)
|
||||
}
|
||||
pb = &VoteSetMaj23Message{
|
||||
Height: msg.VoteSetMaj23.Height,
|
||||
Round: msg.VoteSetMaj23.Round,
|
||||
Type: msg.VoteSetMaj23.Type,
|
||||
BlockID: *bi,
|
||||
}
|
||||
case *tmcons.Message_VoteSetBits:
|
||||
bi, err := types.BlockIDFromProto(&msg.VoteSetBits.BlockID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("voteSetBits msg to proto error: %w", err)
|
||||
}
|
||||
bits := new(bits.BitArray)
|
||||
bits.FromProto(&msg.VoteSetBits.Votes)
|
||||
|
||||
pb = &VoteSetBitsMessage{
|
||||
Height: msg.VoteSetBits.Height,
|
||||
Round: msg.VoteSetBits.Round,
|
||||
Type: msg.VoteSetBits.Type,
|
||||
BlockID: *bi,
|
||||
Votes: bits,
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("consensus: message not recognized: %T", msg)
|
||||
}
|
||||
|
||||
if err := pb.ValidateBasic(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pb, nil
|
||||
}
|
||||
|
||||
// MustEncode takes the reactors msg, makes it proto and marshals it
|
||||
// this mimics `MustMarshalBinaryBare` in that is panics on error
|
||||
func MustEncode(msg Message) []byte {
|
||||
pb, err := MsgToProto(msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
enc, err := proto.Marshal(pb)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return enc
|
||||
}
|
||||
|
||||
// WALToProto takes a WAL message and return a proto walMessage and error
|
||||
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
|
||||
func WALToProto(msg tmcon.WALMessage) (*tmcons.WALMessage, error) {
|
||||
var pb tmcons.WALMessage
|
||||
|
||||
switch msg := msg.(type) {
|
||||
@@ -290,7 +28,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
|
||||
},
|
||||
}
|
||||
case msgInfo:
|
||||
consMsg, err := MsgToProto(msg.Msg)
|
||||
consMsg, err := tmcon.MsgToProto(msg.Msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -313,7 +51,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
|
||||
},
|
||||
},
|
||||
}
|
||||
case EndHeightMessage:
|
||||
case tmcon.EndHeightMessage:
|
||||
pb = tmcons.WALMessage{
|
||||
Sum: &tmcons.WALMessage_EndHeight{
|
||||
EndHeight: &tmcons.EndHeight{
|
||||
@@ -329,11 +67,11 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
|
||||
}
|
||||
|
||||
// WALFromProto takes a proto wal message and return a consensus walMessage and error
|
||||
func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
|
||||
func WALFromProto(msg *tmcons.WALMessage) (tmcon.WALMessage, error) {
|
||||
if msg == nil {
|
||||
return nil, errors.New("nil WAL message")
|
||||
}
|
||||
var pb WALMessage
|
||||
var pb tmcon.WALMessage
|
||||
|
||||
switch msg := msg.Sum.(type) {
|
||||
case *tmcons.WALMessage_EventDataRoundState:
|
||||
@@ -343,7 +81,7 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
|
||||
Step: msg.EventDataRoundState.Step,
|
||||
}
|
||||
case *tmcons.WALMessage_MsgInfo:
|
||||
walMsg, err := MsgFromProto(&msg.MsgInfo.Msg)
|
||||
walMsg, err := tmcon.MsgFromProto(&msg.MsgInfo.Msg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("msgInfo from proto error: %w", err)
|
||||
}
|
||||
@@ -366,7 +104,7 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
|
||||
}
|
||||
return pb, nil
|
||||
case *tmcons.WALMessage_EndHeight:
|
||||
pb := EndHeightMessage{
|
||||
pb := tmcon.EndHeightMessage{
|
||||
Height: msg.EndHeight.Height,
|
||||
}
|
||||
return pb, nil
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
||||
tmcon "github.com/tendermint/tendermint/consensus"
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/libs/bits"
|
||||
tmevents "github.com/tendermint/tendermint/libs/events"
|
||||
@@ -47,7 +48,7 @@ type Reactor struct {
|
||||
waitSync bool
|
||||
eventBus *types.EventBus
|
||||
|
||||
Metrics *Metrics
|
||||
Metrics *tmcon.Metrics
|
||||
}
|
||||
|
||||
type ReactorOption func(*Reactor)
|
||||
@@ -58,7 +59,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
|
||||
conR := &Reactor{
|
||||
conS: consensusState,
|
||||
waitSync: waitSync,
|
||||
Metrics: NopMetrics(),
|
||||
Metrics: tmcon.NopMetrics(),
|
||||
}
|
||||
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
|
||||
|
||||
@@ -252,7 +253,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
switch chID {
|
||||
case StateChannel:
|
||||
switch msg := msg.(type) {
|
||||
case *NewRoundStepMessage:
|
||||
case *tmcon.NewRoundStepMessage:
|
||||
conR.conS.mtx.Lock()
|
||||
initialHeight := conR.conS.state.InitialHeight
|
||||
conR.conS.mtx.Unlock()
|
||||
@@ -262,11 +263,11 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
return
|
||||
}
|
||||
ps.ApplyNewRoundStepMessage(msg)
|
||||
case *NewValidBlockMessage:
|
||||
case *tmcon.NewValidBlockMessage:
|
||||
ps.ApplyNewValidBlockMessage(msg)
|
||||
case *HasVoteMessage:
|
||||
case *tmcon.HasVoteMessage:
|
||||
ps.ApplyHasVoteMessage(msg)
|
||||
case *VoteSetMaj23Message:
|
||||
case *tmcon.VoteSetMaj23Message:
|
||||
cs := conR.conS
|
||||
cs.mtx.Lock()
|
||||
height, votes := cs.Height, cs.Votes
|
||||
@@ -291,7 +292,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
default:
|
||||
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
|
||||
}
|
||||
src.TrySend(VoteSetBitsChannel, MustEncode(&VoteSetBitsMessage{
|
||||
src.TrySend(VoteSetBitsChannel, tmcon.MustEncode(&tmcon.VoteSetBitsMessage{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
Type: msg.Type,
|
||||
@@ -308,12 +309,12 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
return
|
||||
}
|
||||
switch msg := msg.(type) {
|
||||
case *ProposalMessage:
|
||||
case *tmcon.ProposalMessage:
|
||||
ps.SetHasProposal(msg.Proposal)
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||
case *ProposalPOLMessage:
|
||||
case *tmcon.ProposalPOLMessage:
|
||||
ps.ApplyProposalPOLMessage(msg)
|
||||
case *BlockPartMessage:
|
||||
case *tmcon.BlockPartMessage:
|
||||
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
|
||||
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||
@@ -327,7 +328,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
return
|
||||
}
|
||||
switch msg := msg.(type) {
|
||||
case *VoteMessage:
|
||||
case *tmcon.VoteMessage:
|
||||
cs := conR.conS
|
||||
cs.mtx.RLock()
|
||||
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
|
||||
@@ -349,7 +350,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
return
|
||||
}
|
||||
switch msg := msg.(type) {
|
||||
case *VoteSetBitsMessage:
|
||||
case *tmcon.VoteSetBitsMessage:
|
||||
cs := conR.conS
|
||||
cs.mtx.Lock()
|
||||
height, votes := cs.Height, cs.Votes
|
||||
@@ -429,29 +430,29 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() {
|
||||
|
||||
func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
|
||||
nrsMsg := makeRoundStepMessage(rs)
|
||||
conR.Switch.Broadcast(StateChannel, MustEncode(nrsMsg))
|
||||
conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(nrsMsg))
|
||||
}
|
||||
|
||||
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
|
||||
csMsg := &NewValidBlockMessage{
|
||||
csMsg := &tmcon.NewValidBlockMessage{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
|
||||
BlockParts: rs.ProposalBlockParts.BitArray(),
|
||||
IsCommit: rs.Step == cstypes.RoundStepCommit,
|
||||
}
|
||||
conR.Switch.Broadcast(StateChannel, MustEncode(csMsg))
|
||||
conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(csMsg))
|
||||
}
|
||||
|
||||
// Broadcasts HasVoteMessage to peers that care.
|
||||
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
msg := &HasVoteMessage{
|
||||
msg := &tmcon.HasVoteMessage{
|
||||
Height: vote.Height,
|
||||
Round: vote.Round,
|
||||
Type: vote.Type,
|
||||
Index: vote.ValidatorIndex,
|
||||
}
|
||||
conR.Switch.Broadcast(StateChannel, MustEncode(msg))
|
||||
conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(msg))
|
||||
/*
|
||||
// TODO: Make this broadcast more selective.
|
||||
for _, peer := range conR.Switch.Peers().List() {
|
||||
@@ -472,8 +473,8 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
*/
|
||||
}
|
||||
|
||||
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
|
||||
nrsMsg = &NewRoundStepMessage{
|
||||
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcon.NewRoundStepMessage) {
|
||||
nrsMsg = &tmcon.NewRoundStepMessage{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Step: rs.Step,
|
||||
@@ -486,7 +487,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage)
|
||||
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
|
||||
rs := conR.conS.GetRoundState()
|
||||
nrsMsg := makeRoundStepMessage(rs)
|
||||
peer.Send(StateChannel, MustEncode(nrsMsg))
|
||||
peer.Send(StateChannel, tmcon.MustEncode(nrsMsg))
|
||||
}
|
||||
|
||||
func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
|
||||
@@ -506,13 +507,13 @@ OUTER_LOOP:
|
||||
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
|
||||
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
|
||||
part := rs.ProposalBlockParts.GetPart(index)
|
||||
msg := &BlockPartMessage{
|
||||
msg := &tmcon.BlockPartMessage{
|
||||
Height: rs.Height, // This tells peer that this part applies to us.
|
||||
Round: rs.Round, // This tells peer that this part applies to us.
|
||||
Part: part,
|
||||
}
|
||||
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
|
||||
if peer.Send(DataChannel, MustEncode(msg)) {
|
||||
if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
|
||||
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
|
||||
}
|
||||
continue OUTER_LOOP
|
||||
@@ -555,9 +556,9 @@ OUTER_LOOP:
|
||||
if rs.Proposal != nil && !prs.Proposal {
|
||||
// Proposal: share the proposal metadata with peer.
|
||||
{
|
||||
msg := &ProposalMessage{Proposal: rs.Proposal}
|
||||
msg := &tmcon.ProposalMessage{Proposal: rs.Proposal}
|
||||
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
|
||||
if peer.Send(DataChannel, MustEncode(msg)) {
|
||||
if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
|
||||
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
|
||||
ps.SetHasProposal(rs.Proposal)
|
||||
}
|
||||
@@ -567,13 +568,13 @@ OUTER_LOOP:
|
||||
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
|
||||
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
|
||||
if 0 <= rs.Proposal.POLRound {
|
||||
msg := &ProposalPOLMessage{
|
||||
msg := &tmcon.ProposalPOLMessage{
|
||||
Height: rs.Height,
|
||||
ProposalPOLRound: rs.Proposal.POLRound,
|
||||
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
|
||||
}
|
||||
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
|
||||
peer.Send(DataChannel, MustEncode(msg))
|
||||
peer.Send(DataChannel, tmcon.MustEncode(msg))
|
||||
}
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
@@ -610,13 +611,13 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
|
||||
return
|
||||
}
|
||||
// Send the part
|
||||
msg := &BlockPartMessage{
|
||||
msg := &tmcon.BlockPartMessage{
|
||||
Height: prs.Height, // Not our height, so it doesn't matter.
|
||||
Round: prs.Round, // Not our height, so it doesn't matter.
|
||||
Part: part,
|
||||
}
|
||||
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
|
||||
if peer.Send(DataChannel, MustEncode(msg)) {
|
||||
if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
|
||||
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
|
||||
} else {
|
||||
logger.Debug("Sending block part for catchup failed")
|
||||
@@ -773,7 +774,7 @@ OUTER_LOOP:
|
||||
prs := ps.GetRoundState()
|
||||
if rs.Height == prs.Height {
|
||||
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
|
||||
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
|
||||
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: prs.Round,
|
||||
Type: tmproto.PrevoteType,
|
||||
@@ -790,7 +791,7 @@ OUTER_LOOP:
|
||||
prs := ps.GetRoundState()
|
||||
if rs.Height == prs.Height {
|
||||
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
|
||||
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
|
||||
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: prs.Round,
|
||||
Type: tmproto.PrecommitType,
|
||||
@@ -807,7 +808,7 @@ OUTER_LOOP:
|
||||
prs := ps.GetRoundState()
|
||||
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
|
||||
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
|
||||
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
|
||||
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: prs.ProposalPOLRound,
|
||||
Type: tmproto.PrevoteType,
|
||||
@@ -827,7 +828,7 @@ OUTER_LOOP:
|
||||
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
|
||||
prs.Height >= conR.conS.blockStore.Base() {
|
||||
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
|
||||
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
|
||||
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: commit.Round,
|
||||
Type: tmproto.PrecommitType,
|
||||
@@ -866,11 +867,11 @@ func (conR *Reactor) peerStatsRoutine() {
|
||||
panic(fmt.Sprintf("Peer %v has no state", peer))
|
||||
}
|
||||
switch msg.Msg.(type) {
|
||||
case *VoteMessage:
|
||||
case *tmcon.VoteMessage:
|
||||
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
|
||||
conR.Switch.MarkPeerAsGood(peer)
|
||||
}
|
||||
case *BlockPartMessage:
|
||||
case *tmcon.BlockPartMessage:
|
||||
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
|
||||
conR.Switch.MarkPeerAsGood(peer)
|
||||
}
|
||||
@@ -908,7 +909,7 @@ func (conR *Reactor) StringIndented(indent string) string {
|
||||
}
|
||||
|
||||
// ReactorMetrics sets the metrics
|
||||
func ReactorMetrics(metrics *Metrics) ReactorOption {
|
||||
func ReactorMetrics(metrics *tmcon.Metrics) ReactorOption {
|
||||
return func(conR *Reactor) { conR.Metrics = metrics }
|
||||
}
|
||||
|
||||
@@ -1046,9 +1047,9 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
|
||||
// Returns true if vote was sent.
|
||||
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
|
||||
if vote, ok := ps.PickVoteToSend(votes); ok {
|
||||
msg := &VoteMessage{vote}
|
||||
msg := &tmcon.VoteMessage{Vote: vote}
|
||||
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
|
||||
if ps.peer.Send(VoteChannel, MustEncode(msg)) {
|
||||
if ps.peer.Send(VoteChannel, tmcon.MustEncode(msg)) {
|
||||
ps.SetHasVote(vote)
|
||||
return true
|
||||
}
|
||||
@@ -1255,7 +1256,7 @@ func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.Sign
|
||||
}
|
||||
|
||||
// ApplyNewRoundStepMessage updates the peer state for the new round.
|
||||
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
|
||||
func (ps *PeerState) ApplyNewRoundStepMessage(msg *tmcon.NewRoundStepMessage) {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
@@ -1308,7 +1309,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
|
||||
}
|
||||
|
||||
// ApplyNewValidBlockMessage updates the peer state for the new valid block.
|
||||
func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
|
||||
func (ps *PeerState) ApplyNewValidBlockMessage(msg *tmcon.NewValidBlockMessage) {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
@@ -1325,7 +1326,7 @@ func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
|
||||
}
|
||||
|
||||
// ApplyProposalPOLMessage updates the peer state for the new proposal POL.
|
||||
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
|
||||
func (ps *PeerState) ApplyProposalPOLMessage(msg *tmcon.ProposalPOLMessage) {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
@@ -1342,7 +1343,7 @@ func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
|
||||
}
|
||||
|
||||
// ApplyHasVoteMessage updates the peer state for the new vote.
|
||||
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
|
||||
func (ps *PeerState) ApplyHasVoteMessage(msg *tmcon.HasVoteMessage) {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
@@ -1358,7 +1359,7 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
|
||||
// `ourVotes` is a BitArray of votes we have for msg.BlockID
|
||||
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
|
||||
// we conservatively overwrite ps's votes w/ msg.Votes.
|
||||
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *bits.BitArray) {
|
||||
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *tmcon.VoteSetBitsMessage, ourVotes *bits.BitArray) {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
@@ -1395,12 +1396,6 @@ func (ps *PeerState) StringIndented(indent string) string {
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
// Message is a message that can be sent and received on the Reactor
|
||||
type Message interface {
|
||||
ValidateBasic() error
|
||||
}
|
||||
|
||||
// func init() {
|
||||
// tmjson.RegisterType(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage")
|
||||
@@ -1414,307 +1409,11 @@ type Message interface {
|
||||
// tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
|
||||
// }
|
||||
|
||||
func decodeMsg(bz []byte) (msg Message, err error) {
|
||||
func decodeMsg(bz []byte) (msg tmcon.Message, err error) {
|
||||
pb := &tmcons.Message{}
|
||||
if err = proto.Unmarshal(bz, pb); err != nil {
|
||||
return msg, err
|
||||
}
|
||||
|
||||
return MsgFromProto(pb)
|
||||
return tmcon.MsgFromProto(pb)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
|
||||
// For every height/round/step transition
|
||||
type NewRoundStepMessage struct {
|
||||
Height int64
|
||||
Round int32
|
||||
Step cstypes.RoundStepType
|
||||
SecondsSinceStartTime int64
|
||||
LastCommitRound int32
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *NewRoundStepMessage) ValidateBasic() error {
|
||||
if m.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
if m.Round < 0 {
|
||||
return errors.New("negative Round")
|
||||
}
|
||||
if !m.Step.IsValid() {
|
||||
return errors.New("invalid Step")
|
||||
}
|
||||
|
||||
// NOTE: SecondsSinceStartTime may be negative
|
||||
|
||||
// LastCommitRound will be -1 for the initial height, but we don't know what height this is
|
||||
// since it can be specified in genesis. The reactor will have to validate this via
|
||||
// ValidateHeight().
|
||||
if m.LastCommitRound < -1 {
|
||||
return errors.New("invalid LastCommitRound (cannot be < -1)")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateHeight validates the height given the chain's initial height.
|
||||
func (m *NewRoundStepMessage) ValidateHeight(initialHeight int64) error {
|
||||
if m.Height < initialHeight {
|
||||
return fmt.Errorf("invalid Height %v (lower than initial height %v)",
|
||||
m.Height, initialHeight)
|
||||
}
|
||||
if m.Height == initialHeight && m.LastCommitRound != -1 {
|
||||
return fmt.Errorf("invalid LastCommitRound %v (must be -1 for initial height %v)",
|
||||
m.LastCommitRound, initialHeight)
|
||||
}
|
||||
if m.Height > initialHeight && m.LastCommitRound < 0 {
|
||||
return fmt.Errorf("LastCommitRound can only be negative for initial height %v", // nolint
|
||||
initialHeight)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *NewRoundStepMessage) String() string {
|
||||
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
|
||||
m.Height, m.Round, m.Step, m.LastCommitRound)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// NewValidBlockMessage is sent when a validator observes a valid block B in some round r,
|
||||
// i.e., there is a Proposal for block B and 2/3+ prevotes for the block B in the round r.
|
||||
// In case the block is also committed, then IsCommit flag is set to true.
|
||||
type NewValidBlockMessage struct {
|
||||
Height int64
|
||||
Round int32
|
||||
BlockPartSetHeader types.PartSetHeader
|
||||
BlockParts *bits.BitArray
|
||||
IsCommit bool
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *NewValidBlockMessage) ValidateBasic() error {
|
||||
if m.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
if m.Round < 0 {
|
||||
return errors.New("negative Round")
|
||||
}
|
||||
if err := m.BlockPartSetHeader.ValidateBasic(); err != nil {
|
||||
return fmt.Errorf("wrong BlockPartSetHeader: %v", err)
|
||||
}
|
||||
if m.BlockParts.Size() == 0 {
|
||||
return errors.New("empty blockParts")
|
||||
}
|
||||
if m.BlockParts.Size() != int(m.BlockPartSetHeader.Total) {
|
||||
return fmt.Errorf("blockParts bit array size %d not equal to BlockPartSetHeader.Total %d",
|
||||
m.BlockParts.Size(),
|
||||
m.BlockPartSetHeader.Total)
|
||||
}
|
||||
if m.BlockParts.Size() > int(types.MaxBlockPartsCount) {
|
||||
return fmt.Errorf("blockParts bit array is too big: %d, max: %d", m.BlockParts.Size(), types.MaxBlockPartsCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *NewValidBlockMessage) String() string {
|
||||
return fmt.Sprintf("[ValidBlockMessage H:%v R:%v BP:%v BA:%v IsCommit:%v]",
|
||||
m.Height, m.Round, m.BlockPartSetHeader, m.BlockParts, m.IsCommit)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// ProposalMessage is sent when a new block is proposed.
|
||||
type ProposalMessage struct {
|
||||
Proposal *types.Proposal
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *ProposalMessage) ValidateBasic() error {
|
||||
return m.Proposal.ValidateBasic()
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *ProposalMessage) String() string {
|
||||
return fmt.Sprintf("[Proposal %v]", m.Proposal)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// ProposalPOLMessage is sent when a previous proposal is re-proposed.
|
||||
type ProposalPOLMessage struct {
|
||||
Height int64
|
||||
ProposalPOLRound int32
|
||||
ProposalPOL *bits.BitArray
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *ProposalPOLMessage) ValidateBasic() error {
|
||||
if m.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
if m.ProposalPOLRound < 0 {
|
||||
return errors.New("negative ProposalPOLRound")
|
||||
}
|
||||
if m.ProposalPOL.Size() == 0 {
|
||||
return errors.New("empty ProposalPOL bit array")
|
||||
}
|
||||
if m.ProposalPOL.Size() > types.MaxVotesCount {
|
||||
return fmt.Errorf("proposalPOL bit array is too big: %d, max: %d", m.ProposalPOL.Size(), types.MaxVotesCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *ProposalPOLMessage) String() string {
|
||||
return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// BlockPartMessage is sent when gossipping a piece of the proposed block.
|
||||
type BlockPartMessage struct {
|
||||
Height int64
|
||||
Round int32
|
||||
Part *types.Part
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *BlockPartMessage) ValidateBasic() error {
|
||||
if m.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
if m.Round < 0 {
|
||||
return errors.New("negative Round")
|
||||
}
|
||||
if err := m.Part.ValidateBasic(); err != nil {
|
||||
return fmt.Errorf("wrong Part: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *BlockPartMessage) String() string {
|
||||
return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// VoteMessage is sent when voting for a proposal (or lack thereof).
|
||||
type VoteMessage struct {
|
||||
Vote *types.Vote
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *VoteMessage) ValidateBasic() error {
|
||||
return m.Vote.ValidateBasic()
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *VoteMessage) String() string {
|
||||
return fmt.Sprintf("[Vote %v]", m.Vote)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// HasVoteMessage is sent to indicate that a particular vote has been received.
|
||||
type HasVoteMessage struct {
|
||||
Height int64
|
||||
Round int32
|
||||
Type tmproto.SignedMsgType
|
||||
Index int32
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *HasVoteMessage) ValidateBasic() error {
|
||||
if m.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
if m.Round < 0 {
|
||||
return errors.New("negative Round")
|
||||
}
|
||||
if !types.IsVoteTypeValid(m.Type) {
|
||||
return errors.New("invalid Type")
|
||||
}
|
||||
if m.Index < 0 {
|
||||
return errors.New("negative Index")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *HasVoteMessage) String() string {
|
||||
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v}]", m.Index, m.Height, m.Round, m.Type)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
|
||||
type VoteSetMaj23Message struct {
|
||||
Height int64
|
||||
Round int32
|
||||
Type tmproto.SignedMsgType
|
||||
BlockID types.BlockID
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *VoteSetMaj23Message) ValidateBasic() error {
|
||||
if m.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
if m.Round < 0 {
|
||||
return errors.New("negative Round")
|
||||
}
|
||||
if !types.IsVoteTypeValid(m.Type) {
|
||||
return errors.New("invalid Type")
|
||||
}
|
||||
if err := m.BlockID.ValidateBasic(); err != nil {
|
||||
return fmt.Errorf("wrong BlockID: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *VoteSetMaj23Message) String() string {
|
||||
return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
|
||||
type VoteSetBitsMessage struct {
|
||||
Height int64
|
||||
Round int32
|
||||
Type tmproto.SignedMsgType
|
||||
BlockID types.BlockID
|
||||
Votes *bits.BitArray
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *VoteSetBitsMessage) ValidateBasic() error {
|
||||
if m.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
if !types.IsVoteTypeValid(m.Type) {
|
||||
return errors.New("invalid Type")
|
||||
}
|
||||
if err := m.BlockID.ValidateBasic(); err != nil {
|
||||
return fmt.Errorf("wrong BlockID: %v", err)
|
||||
}
|
||||
// NOTE: Votes.Size() can be zero if the node does not have any
|
||||
if m.Votes.Size() > types.MaxVotesCount {
|
||||
return fmt.Errorf("votes bit array is too big: %d, max: %d", m.Votes.Size(), types.MaxVotesCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *VoteSetBitsMessage) String() string {
|
||||
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
tmcon "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto/merkle"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
@@ -35,9 +36,9 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli)
|
||||
// Unmarshal and apply a single message to the consensus state as if it were
|
||||
// received in receiveRoutine. Lines that start with "#" are ignored.
|
||||
// NOTE: receiveRoutine should not be running.
|
||||
func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscription) error {
|
||||
func (cs *State) readReplayMessage(msg *tmcon.TimedWALMessage, newStepSub types.Subscription) error {
|
||||
// Skip meta messages which exist for demarcating boundaries.
|
||||
if _, ok := msg.Msg.(EndHeightMessage); ok {
|
||||
if _, ok := msg.Msg.(tmcon.EndHeightMessage); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -66,13 +67,13 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscr
|
||||
peerID = "local"
|
||||
}
|
||||
switch msg := m.Msg.(type) {
|
||||
case *ProposalMessage:
|
||||
case *tmcon.ProposalMessage:
|
||||
p := msg.Proposal
|
||||
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
|
||||
p.BlockID.PartSetHeader, "pol", p.POLRound, "peer", peerID)
|
||||
case *BlockPartMessage:
|
||||
case *tmcon.BlockPartMessage:
|
||||
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
|
||||
case *VoteMessage:
|
||||
case *tmcon.VoteMessage:
|
||||
v := msg.Vote
|
||||
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
|
||||
"blockID", v.BlockID, "peer", peerID)
|
||||
@@ -102,7 +103,7 @@ func (cs *State) catchupReplay(csHeight int64) error {
|
||||
// this check (since we can crash after writing #ENDHEIGHT).
|
||||
//
|
||||
// Ignore data corruption errors since this is a sanity check.
|
||||
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
|
||||
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &tmcon.WALSearchOptions{IgnoreDataCorruptionErrors: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -125,7 +126,7 @@ func (cs *State) catchupReplay(csHeight int64) error {
|
||||
if csHeight == cs.state.InitialHeight {
|
||||
endHeight = 0
|
||||
}
|
||||
gr, found, err = cs.wal.SearchForEndHeight(endHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
|
||||
gr, found, err = cs.wal.SearchForEndHeight(endHeight, &tmcon.WALSearchOptions{IgnoreDataCorruptionErrors: true})
|
||||
if err == io.EOF {
|
||||
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", endHeight)
|
||||
} else if err != nil {
|
||||
@@ -138,7 +139,7 @@ func (cs *State) catchupReplay(csHeight int64) error {
|
||||
|
||||
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)
|
||||
|
||||
var msg *TimedWALMessage
|
||||
var msg *tmcon.TimedWALMessage
|
||||
dec := WALDecoder{gr}
|
||||
|
||||
LOOP:
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
tmcon "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmos "github.com/tendermint/tendermint/libs/os"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
@@ -73,7 +74,7 @@ func (cs *State) ReplayFile(file string, console bool) error {
|
||||
defer pb.fp.Close()
|
||||
|
||||
var nextN int // apply N msgs in a row
|
||||
var msg *TimedWALMessage
|
||||
var msg *tmcon.TimedWALMessage
|
||||
for {
|
||||
if nextN == 0 && console {
|
||||
nextN = pb.replayConsoleLoop()
|
||||
@@ -147,7 +148,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
|
||||
fmt.Printf("Reseting from %d to %d\n", pb.count, count)
|
||||
pb.count = 0
|
||||
pb.cs = newCS
|
||||
var msg *TimedWALMessage
|
||||
var msg *tmcon.TimedWALMessage
|
||||
for i := 0; i < count; i++ {
|
||||
msg, err = pb.dec.Decode()
|
||||
if err == io.EOF {
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
tmcon "github.com/tendermint/tendermint/consensus"
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
tmevents "github.com/tendermint/tendermint/libs/events"
|
||||
@@ -78,7 +79,7 @@ type State struct {
|
||||
|
||||
// a Write-Ahead Log ensures we can recover from any kind of crash
|
||||
// and helps us avoid signing conflicting votes
|
||||
wal WAL
|
||||
wal tmcon.WAL
|
||||
replayMode bool // so we don't log signing errors during replay
|
||||
doWALCatchup bool // determines if we even try to do the catchup
|
||||
|
||||
@@ -96,7 +97,7 @@ type State struct {
|
||||
evsw tmevents.EventSwitch
|
||||
|
||||
// for reporting metrics
|
||||
metrics *Metrics
|
||||
metrics *tmcon.Metrics
|
||||
|
||||
// misbehaviors mapped for each height (can't have more than one misbehavior per height)
|
||||
misbehaviors map[int64]Misbehavior
|
||||
@@ -134,7 +135,7 @@ func NewState(
|
||||
wal: nilWAL{},
|
||||
evpool: evpool,
|
||||
evsw: tmevents.NewEventSwitch(),
|
||||
metrics: NopMetrics(),
|
||||
metrics: tmcon.NopMetrics(),
|
||||
misbehaviors: misbehaviors,
|
||||
}
|
||||
// set function defaults (may be overwritten before calling Start)
|
||||
@@ -173,7 +174,7 @@ func (cs *State) handleMsg(mi msgInfo) {
|
||||
)
|
||||
msg, peerID := mi.Msg, mi.PeerID
|
||||
switch msg := msg.(type) {
|
||||
case *ProposalMessage:
|
||||
case *tmcon.ProposalMessage:
|
||||
// will not cause transition.
|
||||
// once proposal is set, we can receive block parts
|
||||
// err = cs.setProposal(msg.Proposal)
|
||||
@@ -182,7 +183,7 @@ func (cs *State) handleMsg(mi msgInfo) {
|
||||
} else {
|
||||
err = defaultReceiveProposal(cs, msg.Proposal)
|
||||
}
|
||||
case *BlockPartMessage:
|
||||
case *tmcon.BlockPartMessage:
|
||||
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
|
||||
added, err = cs.addProposalBlockPart(msg, peerID)
|
||||
if added {
|
||||
@@ -200,7 +201,7 @@ func (cs *State) handleMsg(mi msgInfo) {
|
||||
msg.Round)
|
||||
err = nil
|
||||
}
|
||||
case *VoteMessage:
|
||||
case *tmcon.VoteMessage:
|
||||
// attempt to add the vote and dupeout the validator if its a duplicate signature
|
||||
// if the vote gives us a 2/3-any or 2/3-one, we transition
|
||||
added, err = cs.tryAddVote(msg.Vote, peerID)
|
||||
@@ -442,7 +443,7 @@ var (
|
||||
|
||||
// msgs from the reactor which may update the state
|
||||
type msgInfo struct {
|
||||
Msg Message `json:"msg"`
|
||||
Msg tmcon.Message `json:"msg"`
|
||||
PeerID p2p.ID `json:"peer_key"`
|
||||
}
|
||||
|
||||
@@ -485,7 +486,7 @@ func (cs *State) SetEventBus(b *types.EventBus) {
|
||||
}
|
||||
|
||||
// StateMetrics sets the metrics.
|
||||
func StateMetrics(metrics *Metrics) StateOption {
|
||||
func StateMetrics(metrics *tmcon.Metrics) StateOption {
|
||||
return func(cs *State) { cs.metrics = metrics }
|
||||
}
|
||||
|
||||
@@ -685,7 +686,7 @@ func (cs *State) Wait() {
|
||||
|
||||
// OpenWAL opens a file to log all consensus messages and timeouts for
|
||||
// deterministic accountability.
|
||||
func (cs *State) OpenWAL(walFile string) (WAL, error) {
|
||||
func (cs *State) OpenWAL(walFile string) (tmcon.WAL, error) {
|
||||
wal, err := NewWAL(walFile)
|
||||
if err != nil {
|
||||
cs.Logger.Error("Failed to open WAL", "file", walFile, "err", err)
|
||||
@@ -709,9 +710,9 @@ func (cs *State) OpenWAL(walFile string) (WAL, error) {
|
||||
// AddVote inputs a vote.
|
||||
func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
|
||||
if peerID == "" {
|
||||
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}
|
||||
cs.internalMsgQueue <- msgInfo{&tmcon.VoteMessage{Vote: vote}, ""}
|
||||
} else {
|
||||
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID}
|
||||
cs.peerMsgQueue <- msgInfo{&tmcon.VoteMessage{Vote: vote}, peerID}
|
||||
}
|
||||
|
||||
// TODO: wait for event?!
|
||||
@@ -722,9 +723,9 @@ func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error
|
||||
func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
|
||||
|
||||
if peerID == "" {
|
||||
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
|
||||
cs.internalMsgQueue <- msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, ""}
|
||||
} else {
|
||||
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID}
|
||||
cs.peerMsgQueue <- msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, peerID}
|
||||
}
|
||||
|
||||
// TODO: wait for event?!
|
||||
@@ -735,9 +736,9 @@ func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
|
||||
func (cs *State) AddProposalBlockPart(height int64, round int32, part *types.Part, peerID p2p.ID) error {
|
||||
|
||||
if peerID == "" {
|
||||
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}
|
||||
cs.internalMsgQueue <- msgInfo{&tmcon.BlockPartMessage{Height: height, Round: round, Part: part}, ""}
|
||||
} else {
|
||||
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID}
|
||||
cs.peerMsgQueue <- msgInfo{&tmcon.BlockPartMessage{Height: height, Round: round, Part: part}, peerID}
|
||||
}
|
||||
|
||||
// TODO: wait for event?!
|
||||
@@ -998,7 +999,7 @@ func (cs *State) receiveRoutine(maxSteps int) {
|
||||
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", mi, err))
|
||||
}
|
||||
|
||||
if _, ok := mi.Msg.(*VoteMessage); ok {
|
||||
if _, ok := mi.Msg.(*tmcon.VoteMessage); ok {
|
||||
// we actually want to simulate failing during
|
||||
// the previous WriteSync, but this isn't easy to do.
|
||||
// Equivalent would be to fail here and manually remove
|
||||
@@ -1210,10 +1211,10 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
|
||||
proposal.Signature = p.Signature
|
||||
|
||||
// send proposal and block parts on internal msg queue
|
||||
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
|
||||
cs.sendInternalMessage(msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, ""})
|
||||
for i := 0; i < int(blockParts.Total()); i++ {
|
||||
part := blockParts.GetPart(i)
|
||||
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
|
||||
cs.sendInternalMessage(msgInfo{&tmcon.BlockPartMessage{Height: cs.Height, Round: cs.Round, Part: part}, ""})
|
||||
}
|
||||
cs.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
|
||||
cs.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block))
|
||||
@@ -1490,7 +1491,7 @@ func (cs *State) finalizeCommit(height int64) {
|
||||
// Either way, the State should not be resumed until we
|
||||
// successfully call ApplyBlock (ie. later here, or in Handshake after
|
||||
// restart).
|
||||
endMsg := EndHeightMessage{height}
|
||||
endMsg := tmcon.EndHeightMessage{Height: height}
|
||||
if err := cs.wal.WriteSync(endMsg); err != nil { // NOTE: fsync
|
||||
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node",
|
||||
endMsg, err))
|
||||
@@ -1658,7 +1659,7 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
|
||||
// NOTE: block is not necessarily valid.
|
||||
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit,
|
||||
// once we have the full block.
|
||||
func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (added bool, err error) {
|
||||
func (cs *State) addProposalBlockPart(msg *tmcon.BlockPartMessage, peerID p2p.ID) (added bool, err error) {
|
||||
height, round, part := msg.Height, msg.Round, msg.Part
|
||||
|
||||
// Blocks might be reused, so round mismatch is OK
|
||||
@@ -1858,7 +1859,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
|
||||
// TODO: pass pubKey to signVote
|
||||
vote, err := cs.signVote(msgType, hash, header)
|
||||
if err == nil {
|
||||
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
|
||||
cs.sendInternalMessage(msgInfo{&tmcon.VoteMessage{Vote: vote}, ""})
|
||||
cs.Logger.Info("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
|
||||
return vote
|
||||
}
|
||||
|
||||
@@ -11,8 +11,9 @@ import (
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
||||
auto "github.com/tendermint/tendermint/libs/autofile"
|
||||
// tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
tmcon "github.com/tendermint/tendermint/consensus"
|
||||
auto "github.com/tendermint/tendermint/libs/autofile"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmos "github.com/tendermint/tendermint/libs/os"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
@@ -30,44 +31,12 @@ const (
|
||||
|
||||
//--------------------------------------------------------
|
||||
// types and functions for savings consensus messages
|
||||
|
||||
// TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
|
||||
type TimedWALMessage struct {
|
||||
Time time.Time `json:"time"`
|
||||
Msg WALMessage `json:"msg"`
|
||||
}
|
||||
|
||||
// EndHeightMessage marks the end of the given height inside WAL.
|
||||
// @internal used by scripts/wal2json util.
|
||||
type EndHeightMessage struct {
|
||||
Height int64 `json:"height"`
|
||||
}
|
||||
|
||||
type WALMessage interface{}
|
||||
|
||||
// func init() {
|
||||
// tmjson.RegisterType(msgInfo{}, "tendermint/wal/MsgInfo")
|
||||
// tmjson.RegisterType(timeoutInfo{}, "tendermint/wal/TimeoutInfo")
|
||||
// tmjson.RegisterType(EndHeightMessage{}, "tendermint/wal/EndHeightMessage")
|
||||
// tmjson.RegisterType(tmcon.EndHeightMessage {}, "tendermint/wal/EndHeightMessage ")
|
||||
// }
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Simple write-ahead logger
|
||||
|
||||
// WAL is an interface for any write-ahead logger.
|
||||
type WAL interface {
|
||||
Write(WALMessage) error
|
||||
WriteSync(WALMessage) error
|
||||
FlushAndSync() error
|
||||
|
||||
SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
|
||||
|
||||
// service methods
|
||||
Start() error
|
||||
Stop() error
|
||||
Wait()
|
||||
}
|
||||
|
||||
// Write ahead logger writes msgs to disk before they are processed.
|
||||
// Can be used for crash-recovery and deterministic replay.
|
||||
// TODO: currently the wal is overwritten during replay catchup, give it a mode
|
||||
@@ -84,7 +53,7 @@ type BaseWAL struct {
|
||||
flushInterval time.Duration
|
||||
}
|
||||
|
||||
var _ WAL = &BaseWAL{}
|
||||
var _ tmcon.WAL = &BaseWAL{}
|
||||
|
||||
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
|
||||
// WAL. It's flushed and synced to disk every 2s and once when stopped.
|
||||
@@ -126,7 +95,7 @@ func (wal *BaseWAL) OnStart() error {
|
||||
if err != nil {
|
||||
return err
|
||||
} else if size == 0 {
|
||||
if err := wal.WriteSync(EndHeightMessage{0}); err != nil {
|
||||
if err := wal.WriteSync(tmcon.EndHeightMessage{Height: 0}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -181,12 +150,12 @@ func (wal *BaseWAL) Wait() {
|
||||
// Write is called in newStep and for each receive on the
|
||||
// peerMsgQueue and the timeoutTicker.
|
||||
// NOTE: does not call fsync()
|
||||
func (wal *BaseWAL) Write(msg WALMessage) error {
|
||||
func (wal *BaseWAL) Write(msg tmcon.WALMessage) error {
|
||||
if wal == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil {
|
||||
if err := wal.enc.Encode(&tmcon.TimedWALMessage{Time: tmtime.Now(), Msg: msg}); err != nil {
|
||||
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
|
||||
"err", err, "msg", msg)
|
||||
return err
|
||||
@@ -198,7 +167,7 @@ func (wal *BaseWAL) Write(msg WALMessage) error {
|
||||
// WriteSync is called when we receive a msg from ourselves
|
||||
// so that we write to disk before sending signed messages.
|
||||
// NOTE: calls fsync()
|
||||
func (wal *BaseWAL) WriteSync(msg WALMessage) error {
|
||||
func (wal *BaseWAL) WriteSync(msg tmcon.WALMessage) error {
|
||||
if wal == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -230,9 +199,9 @@ type WALSearchOptions struct {
|
||||
// CONTRACT: caller must close group reader.
|
||||
func (wal *BaseWAL) SearchForEndHeight(
|
||||
height int64,
|
||||
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
|
||||
options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
|
||||
var (
|
||||
msg *TimedWALMessage
|
||||
msg *tmcon.TimedWALMessage
|
||||
gr *auto.GroupReader
|
||||
)
|
||||
lastHeightFound := int64(-1)
|
||||
@@ -268,7 +237,7 @@ func (wal *BaseWAL) SearchForEndHeight(
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if m, ok := msg.Msg.(EndHeightMessage); ok {
|
||||
if m, ok := msg.Msg.(tmcon.EndHeightMessage); ok {
|
||||
lastHeightFound = m.Height
|
||||
if m.Height == height { // found
|
||||
wal.Logger.Info("Found", "height", height, "index", index)
|
||||
@@ -299,7 +268,7 @@ func NewWALEncoder(wr io.Writer) *WALEncoder {
|
||||
// Encode writes the custom encoding of v to the stream. It returns an error if
|
||||
// the encoded size of v is greater than 1MB. Any error encountered
|
||||
// during the write is also returned.
|
||||
func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
|
||||
func (enc *WALEncoder) Encode(v *tmcon.TimedWALMessage) error {
|
||||
pbMsg, err := WALToProto(v.Msg)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -366,7 +335,7 @@ func NewWALDecoder(rd io.Reader) *WALDecoder {
|
||||
}
|
||||
|
||||
// Decode reads the next custom-encoded value from its reader and returns it.
|
||||
func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
|
||||
func (dec *WALDecoder) Decode() (*tmcon.TimedWALMessage, error) {
|
||||
b := make([]byte, 4)
|
||||
|
||||
_, err := dec.rd.Read(b)
|
||||
@@ -414,7 +383,7 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
|
||||
if err != nil {
|
||||
return nil, DataCorruptionError{fmt.Errorf("failed to convert from proto: %w", err)}
|
||||
}
|
||||
tMsgWal := &TimedWALMessage{
|
||||
tMsgWal := &tmcon.TimedWALMessage{
|
||||
Time: res.Time,
|
||||
Msg: walMsg,
|
||||
}
|
||||
@@ -424,12 +393,13 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
|
||||
|
||||
type nilWAL struct{}
|
||||
|
||||
var _ WAL = nilWAL{}
|
||||
var _ tmcon.WAL = nilWAL{}
|
||||
|
||||
func (nilWAL) Write(m WALMessage) error { return nil }
|
||||
func (nilWAL) WriteSync(m WALMessage) error { return nil }
|
||||
func (nilWAL) Write(m tmcon.WALMessage) error { return nil }
|
||||
func (nilWAL) WriteSync(m tmcon.WALMessage) error { return nil }
|
||||
func (nilWAL) FlushAndSync() error { return nil }
|
||||
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
|
||||
func (nilWAL) SearchForEndHeight(height int64,
|
||||
options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
|
||||
return nil, false, nil
|
||||
}
|
||||
func (nilWAL) Start() error { return nil }
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
// +build gofuzz
|
||||
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
)
|
||||
|
||||
func Fuzz(data []byte) int {
|
||||
dec := NewWALDecoder(bytes.NewReader(data))
|
||||
for {
|
||||
msg, err := dec.Decode()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
if msg != nil {
|
||||
panic("msg != nil on error")
|
||||
}
|
||||
return 0
|
||||
}
|
||||
var w bytes.Buffer
|
||||
enc := NewWALEncoder(&w)
|
||||
err = enc.Encode(msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
return 1
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
tmcon "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
@@ -98,7 +99,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
|
||||
numBlocksWritten := make(chan struct{})
|
||||
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
|
||||
// see wal.go#103
|
||||
if err := wal.Write(EndHeightMessage{0}); err != nil {
|
||||
if err := wal.Write(tmcon.EndHeightMessage{Height: 0}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -187,13 +188,13 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS
|
||||
// Save writes message to the internal buffer except when heightToStop is
|
||||
// reached, in which case it will signal the caller via signalWhenStopsTo and
|
||||
// skip writing.
|
||||
func (w *byteBufferWAL) Write(m WALMessage) error {
|
||||
func (w *byteBufferWAL) Write(m tmcon.WALMessage) error {
|
||||
if w.stopped {
|
||||
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
|
||||
return nil
|
||||
}
|
||||
|
||||
if endMsg, ok := m.(EndHeightMessage); ok {
|
||||
if endMsg, ok := m.(tmcon.EndHeightMessage); ok {
|
||||
w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop)
|
||||
if endMsg.Height == w.heightToStop {
|
||||
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
|
||||
@@ -204,7 +205,7 @@ func (w *byteBufferWAL) Write(m WALMessage) error {
|
||||
}
|
||||
|
||||
w.logger.Debug("WAL Write Message", "msg", m)
|
||||
err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
|
||||
err := w.enc.Encode(&tmcon.TimedWALMessage{Time: fixedTime, Msg: m})
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("failed to encode the msg %v", m))
|
||||
}
|
||||
@@ -212,7 +213,7 @@ func (w *byteBufferWAL) Write(m WALMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *byteBufferWAL) WriteSync(m WALMessage) error {
|
||||
func (w *byteBufferWAL) WriteSync(m tmcon.WALMessage) error {
|
||||
return w.Write(m)
|
||||
}
|
||||
|
||||
@@ -220,7 +221,7 @@ func (w *byteBufferWAL) FlushAndSync() error { return nil }
|
||||
|
||||
func (w *byteBufferWAL) SearchForEndHeight(
|
||||
height int64,
|
||||
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
|
||||
options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -138,19 +138,19 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger, misbehaviors map[int6
|
||||
}
|
||||
|
||||
// MetricsProvider returns a consensus, p2p and mempool Metrics.
|
||||
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
|
||||
type MetricsProvider func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
|
||||
|
||||
// DefaultMetricsProvider returns Metrics build using Prometheus client library
|
||||
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
|
||||
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
|
||||
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
|
||||
return func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
|
||||
if config.Prometheus {
|
||||
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
return consensus.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID)
|
||||
}
|
||||
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
|
||||
return consensus.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,7 +427,7 @@ func createConsensusReactor(config *cfg.Config,
|
||||
mempool *mempl.CListMempool,
|
||||
evidencePool *evidence.Pool,
|
||||
privValidator types.PrivValidator,
|
||||
csMetrics *cs.Metrics,
|
||||
csMetrics *consensus.Metrics,
|
||||
waitSync bool,
|
||||
eventBus *types.EventBus,
|
||||
consensusLogger log.Logger,
|
||||
|
||||
Reference in New Issue
Block a user