Revert "consensus: p2p refactor (#5969)"

This reverts commit 16bbe8c862.
This commit is contained in:
Aleksandr Bezobchuk
2021-03-16 13:48:17 -04:00
parent efd2fde474
commit 2b60852e23
18 changed files with 3026 additions and 3791 deletions

View File

@@ -1,3 +1,3 @@
# Consensus
# Consensus
See the [consensus spec](https://github.com/tendermint/spec/tree/master/spec/consensus).
See the [consensus spec](https://github.com/tendermint/spec/tree/master/spec/consensus) and the [reactor consensus spec](https://github.com/tendermint/spec/tree/master/spec/reactors/consensus) for more information.

View File

@@ -7,47 +7,49 @@ import (
"path"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abcicli "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/evidence"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tm-db"
)
// Byzantine node sends two different prevotes (nil and blockID) to the same
// validator.
func TestByzantinePrevoteEquivocation(t *testing.T) {
configSetup(t)
//----------------------------------------------
// byzantine failures
nValidators := 4
prevoteHeight := int64(2)
// Byzantine node sends two different prevotes (nil and blockID) to the same validator
func TestByzantinePrevoteEquivocation(t *testing.T) {
const nValidators = 4
const byzantineNode = 0
const prevoteHeight = int64(2)
testName := "consensus_byzantine_test"
tickerFunc := newMockTickerFunc(true)
appFunc := newCounter
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
states := make([]*State, nValidators)
css := make([]*State, nValidators)
for i := 0; i < nValidators; i++ {
logger := consensusLogger().With("test", "byzantine", "validator", i)
stateDB := dbm.NewMemDB() // each state needs its own db
stateStore := sm.NewStore(stateDB)
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
defer os.RemoveAll(thisConfig.RootDir)
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := appFunc()
vals := types.TM2PB.ValidatorUpdates(state.Validators)
@@ -90,174 +92,183 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
cs.SetTimeoutTicker(tickerFunc())
cs.SetLogger(logger)
states[i] = cs
css[i] = cs
}
rts := setup(t, nValidators, states, 100) // buffer must be large enough to not deadlock
// initialize the reactors for each of the validators
reactors := make([]*Reactor, nValidators)
blocksSubs := make([]types.Subscription, 0)
eventBuses := make([]*types.EventBus, nValidators)
for i := 0; i < nValidators; i++ {
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
reactors[i].SetLogger(css[i].Logger)
var bzNodeID p2p.NodeID
// eventBus is already started with the cs
eventBuses[i] = css[i].eventBus
reactors[i].SetEventBus(eventBuses[i])
// Set the first state's reactor as the dedicated byzantine reactor and grab
// the NodeID that corresponds to the state so we can reference the reactor.
bzNodeState := states[0]
for nID, s := range rts.states {
if s == bzNodeState {
bzNodeID = nID
break
blocksSub, err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, 100)
require.NoError(t, err)
blocksSubs = append(blocksSubs, blocksSub)
if css[i].state.LastBlockHeight == 0 { // simulate handle initChain in handshake
err = css[i].blockExec.Store().Save(css[i].state)
require.NoError(t, err)
}
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
return s
}, p2p.Connect2Switches)
bzReactor := rts.reactors[bzNodeID]
// create byzantine validator
bcs := css[byzantineNode]
// alter prevote so that the byzantine node double votes when height is 2
bzNodeState.doPrevote = func(height int64, round int32) {
bcs.doPrevote = func(height int64, round int32) {
// allow first height to happen normally so that byzantine validator is no longer proposer
if height == prevoteHeight {
prevote1, err := bzNodeState.signVote(
tmproto.PrevoteType,
bzNodeState.ProposalBlock.Hash(),
bzNodeState.ProposalBlockParts.Header(),
)
bcs.Logger.Info("Sending two votes")
prevote1, err := bcs.signVote(tmproto.PrevoteType, bcs.ProposalBlock.Hash(), bcs.ProposalBlockParts.Header())
require.NoError(t, err)
prevote2, err := bzNodeState.signVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
prevote2, err := bcs.signVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
require.NoError(t, err)
peerList := reactors[byzantineNode].Switch.Peers().List()
bcs.Logger.Info("Getting peer list", "peers", peerList)
// send two votes to all peers (1st to one half, 2nd to another half)
i := 0
for _, ps := range bzReactor.peers {
if i < len(bzReactor.peers)/2 {
bzNodeState.Logger.Info("signed and pushed vote", "vote", prevote1, "peer", ps.peerID)
bzReactor.voteCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
Vote: prevote1.ToProto(),
},
}
for i, peer := range peerList {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1}))
} else {
bzNodeState.Logger.Info("signed and pushed vote", "vote", prevote2, "peer", ps.peerID)
bzReactor.voteCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
Vote: prevote2.ToProto(),
},
}
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2}))
}
i++
}
} else {
bzNodeState.Logger.Info("behaving normally")
bzNodeState.defaultDoPrevote(height, round)
bcs.Logger.Info("Behaving normally")
bcs.defaultDoPrevote(height, round)
}
}
// Introducing a lazy proposer means that the time of the block committed is
// different to the timestamp that the other nodes have. This tests to ensure
// that the evidence that finally gets proposed will have a valid timestamp.
// lazyProposer := states[1]
lazyNodeState := states[1]
// introducing a lazy proposer means that the time of the block committed is different to the
// timestamp that the other nodes have. This tests to ensure that the evidence that finally gets
// proposed will have a valid timestamp
lazyProposer := css[1]
lazyNodeState.decideProposal = func(height int64, round int32) {
lazyNodeState.Logger.Info("Lazy Proposer proposing condensed commit")
require.NotNil(t, lazyNodeState.privValidator)
lazyProposer.decideProposal = func(height int64, round int32) {
lazyProposer.Logger.Info("Lazy Proposer proposing condensed commit")
if lazyProposer.privValidator == nil {
panic("entered createProposalBlock with privValidator being nil")
}
var commit *types.Commit
switch {
case lazyNodeState.Height == lazyNodeState.state.InitialHeight:
case lazyProposer.Height == lazyProposer.state.InitialHeight:
// We're creating a proposal for the first block.
// The commit is empty, but not nil.
commit = types.NewCommit(0, 0, types.BlockID{}, nil)
case lazyNodeState.LastCommit.HasTwoThirdsMajority():
case lazyProposer.LastCommit.HasTwoThirdsMajority():
// Make the commit from LastCommit
commit = lazyNodeState.LastCommit.MakeCommit()
commit = lazyProposer.LastCommit.MakeCommit()
default: // This shouldn't happen.
lazyNodeState.Logger.Error("enterPropose: Cannot propose anything: No commit for the previous block")
lazyProposer.Logger.Error("enterPropose: Cannot propose anything: No commit for the previous block")
return
}
// omit the last signature in the commit
commit.Signatures[len(commit.Signatures)-1] = types.NewCommitSigAbsent()
if lazyNodeState.privValidatorPubKey == nil {
if lazyProposer.privValidatorPubKey == nil {
// If this node is a validator & proposer in the current round, it will
// miss the opportunity to create a block.
lazyNodeState.Logger.Error(fmt.Sprintf("enterPropose: %v", errPubKeyIsNotSet))
lazyProposer.Logger.Error(fmt.Sprintf("enterPropose: %v", errPubKeyIsNotSet))
return
}
proposerAddr := lazyNodeState.privValidatorPubKey.Address()
proposerAddr := lazyProposer.privValidatorPubKey.Address()
block, blockParts := lazyNodeState.blockExec.CreateProposalBlock(
lazyNodeState.Height, lazyNodeState.state, commit, proposerAddr,
block, blockParts := lazyProposer.blockExec.CreateProposalBlock(
lazyProposer.Height, lazyProposer.state, commit, proposerAddr,
)
// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
// and the privValidator will refuse to sign anything.
if err := lazyNodeState.wal.FlushAndSync(); err != nil {
lazyNodeState.Logger.Error("Error flushing to disk")
if err := lazyProposer.wal.FlushAndSync(); err != nil {
lazyProposer.Logger.Error("Error flushing to disk")
}
// Make proposal
propBlockID := types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()}
proposal := types.NewProposal(height, round, lazyNodeState.ValidRound, propBlockID)
proposal := types.NewProposal(height, round, lazyProposer.ValidRound, propBlockID)
p := proposal.ToProto()
if err := lazyNodeState.privValidator.SignProposal(context.Background(), lazyNodeState.state.ChainID, p); err == nil {
if err := lazyProposer.privValidator.SignProposal(lazyProposer.state.ChainID, p); err == nil {
proposal.Signature = p.Signature
// send proposal and block parts on internal msg queue
lazyNodeState.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
lazyProposer.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
lazyNodeState.sendInternalMessage(msgInfo{&BlockPartMessage{lazyNodeState.Height, lazyNodeState.Round, part}, ""})
lazyProposer.sendInternalMessage(msgInfo{&BlockPartMessage{lazyProposer.Height, lazyProposer.Round, part}, ""})
}
lazyNodeState.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
lazyNodeState.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block))
} else if !lazyNodeState.replayMode {
lazyNodeState.Logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err)
lazyProposer.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
lazyProposer.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block))
} else if !lazyProposer.replayMode {
lazyProposer.Logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err)
}
}
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(state, false)
// start the consensus reactors
for i := 0; i < nValidators; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s, false)
}
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// Evidence should be submitted and committed at the third height but
// we will check the first six just in case
evidenceFromEachValidator := make([]types.Evidence, nValidators)
wg := new(sync.WaitGroup)
i := 0
for _, sub := range rts.subs {
for i := 0; i < nValidators; i++ {
wg.Add(1)
go func(j int, s types.Subscription) {
go func(i int) {
defer wg.Done()
for msg := range s.Out() {
for msg := range blocksSubs[i].Out() {
block := msg.Data().(types.EventDataNewBlock).Block
if len(block.Evidence.Evidence) != 0 {
evidenceFromEachValidator[j] = block.Evidence.Evidence[0]
evidenceFromEachValidator[i] = block.Evidence.Evidence[0]
return
}
}
}(i, sub)
i++
}(i)
}
wg.Wait()
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
pubkey, err := bzNodeState.privValidator.GetPubKey(context.Background())
pubkey, err := bcs.privValidator.GetPubKey()
require.NoError(t, err)
for idx, ev := range evidenceFromEachValidator {
if assert.NotNil(t, ev, idx) {
ev, ok := ev.(*types.DuplicateVoteEvidence)
assert.True(t, ok)
assert.Equal(t, pubkey.Address(), ev.VoteA.ValidatorAddress)
assert.Equal(t, prevoteHeight, ev.Height())
select {
case <-done:
for idx, ev := range evidenceFromEachValidator {
if assert.NotNil(t, ev, idx) {
ev, ok := ev.(*types.DuplicateVoteEvidence)
assert.True(t, ok)
assert.Equal(t, pubkey.Address(), ev.VoteA.ValidatorAddress)
assert.Equal(t, prevoteHeight, ev.Height())
}
}
case <-time.After(20 * time.Second):
for i, reactor := range reactors {
t.Logf("Consensus Reactor %d\n%v", i, reactor)
}
t.Fatalf("Timed out waiting for validators to commit evidence")
}
}
@@ -267,260 +278,259 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// B sees a commit, A doesn't.
// Heal partition and ensure A sees the commit
func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// TODO: https://github.com/tendermint/tendermint/issues/6092
t.SkipNow()
N := 4
logger := consensusLogger().With("test", "byzantine")
app := newCounter
css, cleanup := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), app)
defer cleanup()
// n := 4
// logger := consensusLogger().With("test", "byzantine")
// app := newCounter
// give the byzantine validator a normal ticker
ticker := NewTimeoutTicker()
ticker.SetLogger(css[0].Logger)
css[0].SetTimeoutTicker(ticker)
// states, cleanup := randConsensusState(n, "consensus_byzantine_test", newMockTickerFunc(false), app)
// t.Cleanup(cleanup)
p2pLogger := logger.With("module", "p2p")
// // give the byzantine validator a normal ticker
// ticker := NewTimeoutTicker()
// ticker.SetLogger(states[0].Logger)
// states[0].SetTimeoutTicker(ticker)
blocksSubs := make([]types.Subscription, N)
reactors := make([]p2p.Reactor, N)
for i := 0; i < N; i++ {
// p2pLogger := logger.With("module", "p2p")
// enable txs so we can create different proposals
assertMempool(css[i].txNotifier).EnableTxsAvailable()
// blocksSubs := make([]types.Subscription, n)
// reactors := make([]p2p.Reactor, n)
// for i := 0; i < n; i++ {
// // enable txs so we can create different proposals
// assertMempool(states[i].txNotifier).EnableTxsAvailable()
eventBus := css[i].eventBus
eventBus.SetLogger(logger.With("module", "events", "validator", i))
// eventBus := states[i].eventBus
// eventBus.SetLogger(logger.With("module", "events", "validator", i))
var err error
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
// var err error
// blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
// require.NoError(t, err)
conR := NewReactor(css[i], true) // so we don't start the consensus states
conR.SetLogger(logger.With("validator", i))
conR.SetEventBus(eventBus)
// conR := NewReactor(states[i], true) // so we don't start the consensus states
// conR.SetLogger(logger.With("validator", i))
// conR.SetEventBus(eventBus)
var conRI p2p.Reactor = conR
// var conRI p2p.Reactor = conR
// make first val byzantine
if i == 0 {
conRI = NewByzantineReactor(conR)
}
// // make first val byzantine
// if i == 0 {
// conRI = NewByzantineReactor(conR)
// }
reactors[i] = conRI
err = css[i].blockExec.Store().Save(css[i].state) // for save height 1's validators info
require.NoError(t, err)
}
// reactors[i] = conRI
// err = states[i].blockExec.Store().Save(states[i].state) // for save height 1's validators info
// require.NoError(t, err)
// }
switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(p2pLogger.With("validator", i))
sw.AddReactor("CONSENSUS", reactors[i])
return sw
}, func(sws []*p2p.Switch, i, j int) {
// the network starts partitioned with globally active adversary
if i != 0 {
return
}
p2p.Connect2Switches(sws, i, j)
})
// switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch {
// sw.SetLogger(p2pLogger.With("validator", i))
// sw.AddReactor("CONSENSUS", reactors[i])
// return sw
// }, func(sws []*p2p.Switch, i, j int) {
// // the network starts partitioned with globally active adversary
// if i != 0 {
// return
// }
// p2p.Connect2Switches(sws, i, j)
// })
// make first val byzantine
// NOTE: Now, test validators are MockPV, which by default doesn't
// do any safety checks.
css[0].privValidator.(types.MockPV).DisableChecks()
css[0].decideProposal = func(j int32) func(int64, int32) {
return func(height int64, round int32) {
byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
}
}(int32(0))
// We are setting the prevote function to do nothing because the prevoting
// and precommitting are done alongside the proposal.
css[0].doPrevote = func(height int64, round int32) {}
// // make first val byzantine
// // NOTE: Now, test validators are MockPV, which by default doesn't
// // do any safety checks.
// states[0].privValidator.(types.MockPV).DisableChecks()
// states[0].decideProposal = func(j int32) func(int64, int32) {
// return func(height int64, round int32) {
// byzantineDecideProposalFunc(t, height, round, states[j], switches[j])
// }
// }(int32(0))
// // We are setting the prevote function to do nothing because the prevoting
// // and precommitting are done alongside the proposal.
// states[0].doPrevote = func(height int64, round int32) {}
defer func() {
for _, sw := range switches {
err := sw.Stop()
require.NoError(t, err)
}
}()
// defer func() {
// for _, sw := range switches {
// err := sw.Stop()
// require.NoError(t, err)
// }
// }()
// start the non-byz state machines.
// note these must be started before the byz
for i := 1; i < N; i++ {
cr := reactors[i].(*Reactor)
cr.SwitchToConsensus(cr.conS.GetState(), false)
}
// // start the non-byz state machines.
// // note these must be started before the byz
// for i := 1; i < n; i++ {
// cr := reactors[i].(*Reactor)
// cr.SwitchToConsensus(cr.conS.GetState(), false)
// }
// start the byzantine state machine
byzR := reactors[0].(*ByzantineReactor)
s := byzR.reactor.conS.GetState()
byzR.reactor.SwitchToConsensus(s, false)
// // start the byzantine state machine
// byzR := reactors[0].(*ByzantineReactor)
// s := byzR.reactor.conS.GetState()
// byzR.reactor.SwitchToConsensus(s, false)
// byz proposer sends one block to peers[0]
// and the other block to peers[1] and peers[2].
// note peers and switches order don't match.
peers := switches[0].Peers().List()
// // byz proposer sends one block to peers[0]
// // and the other block to peers[1] and peers[2].
// // note peers and switches order don't match.
// peers := switches[0].Peers().List()
// partition A
ind0 := getSwitchIndex(switches, peers[0])
// // partition A
// ind0 := getSwitchIndex(switches, peers[0])
// partition B
ind1 := getSwitchIndex(switches, peers[1])
ind2 := getSwitchIndex(switches, peers[2])
p2p.Connect2Switches(switches, ind1, ind2)
// // partition B
// ind1 := getSwitchIndex(switches, peers[1])
// ind2 := getSwitchIndex(switches, peers[2])
// p2p.Connect2Switches(switches, ind1, ind2)
// wait for someone in the big partition (B) to make a block
<-blocksSubs[ind2].Out()
// // wait for someone in the big partition (B) to make a block
// <-blocksSubs[ind2].Out()
t.Log("A block has been committed. Healing partition")
p2p.Connect2Switches(switches, ind0, ind1)
p2p.Connect2Switches(switches, ind0, ind2)
// t.Log("A block has been committed. Healing partition")
// p2p.Connect2Switches(switches, ind0, ind1)
// p2p.Connect2Switches(switches, ind0, ind2)
// wait till everyone makes the first new block
// (one of them already has)
wg := new(sync.WaitGroup)
for i := 1; i < N-1; i++ {
wg.Add(1)
go func(j int) {
<-blocksSubs[j].Out()
wg.Done()
}(i)
}
// // wait till everyone makes the first new block
// // (one of them already has)
// wg := new(sync.WaitGroup)
// for i := 1; i < N-1; i++ {
// wg.Add(1)
// go func(j int) {
// <-blocksSubs[j].Out()
// wg.Done()
// }(i)
// }
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
// done := make(chan struct{})
// go func() {
// wg.Wait()
// close(done)
// }()
// tick := time.NewTicker(time.Second * 10)
// select {
// case <-done:
// case <-tick.C:
// for i, reactor := range reactors {
// t.Log(fmt.Sprintf("Consensus Reactor %v", i))
// t.Log(fmt.Sprintf("%v", reactor))
// }
// t.Fatalf("Timed out waiting for all validators to commit first block")
// }
tick := time.NewTicker(time.Second * 10)
select {
case <-done:
case <-tick.C:
for i, reactor := range reactors {
t.Log(fmt.Sprintf("Consensus Reactor %v", i))
t.Log(fmt.Sprintf("%v", reactor))
}
t.Fatalf("Timed out waiting for all validators to commit first block")
}
}
// func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) {
// // byzantine user should create two proposals and try to split the vote.
// // Avoid sending on internalMsgQueue and running consensus state.
//-------------------------------
// byzantine consensus functions
// // Create a new proposal block from state/txs from the mempool.
// block1, blockParts1 := cs.createProposalBlock()
// polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
// proposal1 := types.NewProposal(height, round, polRound, propBlockID)
// p1 := proposal1.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
// t.Error(err)
// }
func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) {
// byzantine user should create two proposals and try to split the vote.
// Avoid sending on internalMsgQueue and running consensus state.
// proposal1.Signature = p1.Signature
// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
proposal1 := types.NewProposal(height, round, polRound, propBlockID)
p1 := proposal1.ToProto()
if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
t.Error(err)
}
// // some new transactions come in (this ensures that the proposals are different)
// deliverTxsRange(cs, 0, 1)
proposal1.Signature = p1.Signature
// // Create a new proposal block from state/txs from the mempool.
// block2, blockParts2 := cs.createProposalBlock()
// polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
// proposal2 := types.NewProposal(height, round, polRound, propBlockID)
// p2 := proposal2.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
// t.Error(err)
// }
// some new transactions come in (this ensures that the proposals are different)
deliverTxsRange(cs, 0, 1)
// proposal2.Signature = p2.Signature
// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
proposal2 := types.NewProposal(height, round, polRound, propBlockID)
p2 := proposal2.ToProto()
if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
t.Error(err)
}
// block1Hash := block1.Hash()
// block2Hash := block2.Hash()
proposal2.Signature = p2.Signature
// // broadcast conflicting proposals/block parts to peers
// peers := sw.Peers().List()
// t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
// for i, peer := range peers {
// if i < len(peers)/2 {
// go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
// } else {
// go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
// }
// }
// }
block1Hash := block1.Hash()
block2Hash := block2.Hash()
// func sendProposalAndParts(
// height int64,
// round int32,
// cs *State,
// peer p2p.Peer,
// proposal *types.Proposal,
// blockHash []byte,
// parts *types.PartSet,
// ) {
// // proposal
// msg := &ProposalMessage{Proposal: proposal}
// peer.Send(DataChannel, MustEncode(msg))
// broadcast conflicting proposals/block parts to peers
peers := sw.Peers().List()
t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
for i, peer := range peers {
if i < len(peers)/2 {
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
} else {
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
}
}
}
// // parts
// for i := 0; i < int(parts.Total()); i++ {
// part := parts.GetPart(i)
// msg := &BlockPartMessage{
// Height: height, // This tells peer that this part applies to us.
// Round: round, // This tells peer that this part applies to us.
// Part: part,
// }
// peer.Send(DataChannel, MustEncode(msg))
// }
func sendProposalAndParts(
height int64,
round int32,
cs *State,
peer p2p.Peer,
proposal *types.Proposal,
blockHash []byte,
parts *types.PartSet,
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, MustEncode(msg))
// // votes
// cs.mtx.Lock()
// prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
// precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
// cs.mtx.Unlock()
// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(DataChannel, MustEncode(msg))
}
// peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
// peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
// }
// votes
cs.mtx.Lock()
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()
// type ByzantineReactor struct {
// service.Service
// reactor *Reactor
// }
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
}
// func NewByzantineReactor(conR *Reactor) *ByzantineReactor {
// return &ByzantineReactor{
// Service: conR,
// reactor: conR,
// }
// }
//----------------------------------------
// byzantine consensus reactor
// func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
// func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
type ByzantineReactor struct {
service.Service
reactor *Reactor
}
// func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
// if !br.reactor.IsRunning() {
// return
// }
func NewByzantineReactor(conR *Reactor) *ByzantineReactor {
return &ByzantineReactor{
Service: conR,
reactor: conR,
}
}
// // Create peerState for peer
// peerState := NewPeerState(peer).SetLogger(br.reactor.Logger)
// peer.Set(types.PeerStateKey, peerState)
func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
if !br.reactor.IsRunning() {
return
}
// // Send our state to peer.
// // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
// if !br.reactor.waitSync {
// br.reactor.sendNewRoundStepMessage(peer)
// }
// }
// Create peerState for peer
peerState := NewPeerState(peer).SetLogger(br.reactor.Logger)
peer.Set(types.PeerStateKey, peerState)
// func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// br.reactor.RemovePeer(peer, reason)
// }
// func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
// br.reactor.Receive(chID, peer, msgBytes)
// }
// func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
// Send our state to peer.
// If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.waitSync {
br.reactor.sendNewRoundStepMessage(peer)
}
}
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

View File

@@ -31,6 +31,7 @@ import (
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/privval"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
@@ -54,24 +55,6 @@ var (
ensureTimeout = time.Millisecond * 200
)
func configSetup(t *testing.T) {
t.Helper()
config = ResetConfig("consensus_reactor_test")
consensusReplayConfig = ResetConfig("consensus_replay_test")
configStateTest := ResetConfig("consensus_state_test")
configMempoolTest := ResetConfig("consensus_mempool_test")
configByzantineTest := ResetConfig("consensus_byzantine_test")
t.Cleanup(func() {
os.RemoveAll(config.RootDir)
os.RemoveAll(consensusReplayConfig.RootDir)
os.RemoveAll(configStateTest.RootDir)
os.RemoveAll(configMempoolTest.RootDir)
os.RemoveAll(configByzantineTest.RootDir)
})
}
func ensureDir(dir string, mode os.FileMode) {
if err := tmos.EnsureDir(dir, mode); err != nil {
panic(err)
@@ -108,7 +91,7 @@ func (vs *validatorStub) signVote(
hash []byte,
header types.PartSetHeader) (*types.Vote, error) {
pubKey, err := vs.PrivValidator.GetPubKey(context.Background())
pubKey, err := vs.PrivValidator.GetPubKey()
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}
@@ -123,7 +106,7 @@ func (vs *validatorStub) signVote(
BlockID: types.BlockID{Hash: hash, PartSetHeader: header},
}
v := vote.ToProto()
err = vs.PrivValidator.SignVote(context.Background(), config.ChainID(), v)
err = vs.PrivValidator.SignVote(config.ChainID(), v)
vote.Signature = v.Signature
return vote, err
@@ -169,11 +152,11 @@ func (vss ValidatorStubsByPower) Len() int {
}
func (vss ValidatorStubsByPower) Less(i, j int) bool {
vssi, err := vss[i].GetPubKey(context.Background())
vssi, err := vss[i].GetPubKey()
if err != nil {
panic(err)
}
vssj, err := vss[j].GetPubKey(context.Background())
vssj, err := vss[j].GetPubKey()
if err != nil {
panic(err)
}
@@ -220,7 +203,7 @@ func decideProposal(
polRound, propBlockID := validRound, types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()}
proposal = types.NewProposal(height, round, polRound, propBlockID)
p := proposal.ToProto()
if err := vs.SignProposal(context.Background(), chainID, p); err != nil {
if err := vs.SignProposal(chainID, p); err != nil {
panic(err)
}
@@ -248,7 +231,7 @@ func signAddVotes(
func validatePrevote(t *testing.T, cs *State, round int32, privVal *validatorStub, blockHash []byte) {
prevotes := cs.Votes.Prevotes(round)
pubKey, err := privVal.GetPubKey(context.Background())
pubKey, err := privVal.GetPubKey()
require.NoError(t, err)
address := pubKey.Address()
var vote *types.Vote
@@ -268,7 +251,7 @@ func validatePrevote(t *testing.T, cs *State, round int32, privVal *validatorStu
func validateLastPrecommit(t *testing.T, cs *State, privVal *validatorStub, blockHash []byte) {
votes := cs.LastCommit
pv, err := privVal.GetPubKey(context.Background())
pv, err := privVal.GetPubKey()
require.NoError(t, err)
address := pv.Address()
var vote *types.Vote
@@ -290,7 +273,7 @@ func validatePrecommit(
lockedBlockHash []byte,
) {
precommits := cs.Votes.Precommits(thisRound)
pv, err := privVal.GetPubKey(context.Background())
pv, err := privVal.GetPubKey()
require.NoError(t, err)
address := pv.Address()
var vote *types.Vote
@@ -692,18 +675,11 @@ func consensusLogger() log.Logger {
}).With("module", "consensus")
}
func randConsensusState(
nValidators int,
testName string,
tickerFunc func() TimeoutTicker,
appFunc func() abci.Application,
configOpts ...func(*cfg.Config),
) ([]*State, cleanupFunc) {
func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker,
appFunc func() abci.Application, configOpts ...func(*cfg.Config)) ([]*State, cleanupFunc) {
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
css := make([]*State, nValidators)
logger := consensusLogger()
configRootDirs := make([]string, 0, nValidators)
for i := 0; i < nValidators; i++ {
stateDB := dbm.NewMemDB() // each state needs its own db
@@ -711,13 +687,10 @@ func randConsensusState(
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
configRootDirs = append(configRootDirs, thisConfig.RootDir)
for _, opt := range configOpts {
opt(thisConfig)
}
ensureDir(filepath.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := appFunc()
vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals})
@@ -726,7 +699,6 @@ func randConsensusState(
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
return css, func() {
for _, dir := range configRootDirs {
os.RemoveAll(dir)
@@ -796,6 +768,18 @@ func randConsensusNetWithPeers(
}
}
func getSwitchIndex(switches []*p2p.Switch, peer p2p.Peer) int {
for i, s := range switches {
if peer.NodeInfo().ID() == s.NodeInfo().ID() {
return i
}
}
panic("didnt find peer in switches")
}
//-------------------------------------------------------------------------------
// genesis
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
validators := make([]types.GenesisValidator, numValidators)
privValidators := make([]types.PrivValidator, numValidators)
@@ -823,6 +807,9 @@ func randGenesisState(numValidators int, randPower bool, minPower int64) (sm.Sta
return s0, privValidators
}
//------------------------------------
// mock ticker
func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker {
return func() TimeoutTicker {
return &mockTicker{
@@ -868,6 +855,8 @@ func (m *mockTicker) Chan() <-chan timeoutInfo {
func (*mockTicker) SetLogger(log.Logger) {}
//------------------------------------
func newCounter() abci.Application {
return counter.NewApplication(true)
}

View File

@@ -1,73 +1,60 @@
package consensus
import (
"context"
"sync"
"testing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
func TestReactorInvalidPrecommit(t *testing.T) {
configSetup(t)
//----------------------------------------------
// byzantine failures
n := 4
states, cleanup := randConsensusState(n, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
// one byz val sends a precommit for a random block at each height
// Ensure a testnet makes blocks
func TestReactorInvalidPrecommit(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
t.Cleanup(cleanup)
for i := 0; i < 4; i++ {
ticker := NewTimeoutTicker()
ticker.SetLogger(states[i].Logger)
states[i].SetTimeoutTicker(ticker)
ticker.SetLogger(css[i].Logger)
css[i].SetTimeoutTicker(ticker)
}
rts := setup(t, n, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(state, false)
}
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
// this val sends a random precommit at each height
node := rts.network.RandomNode()
byzValIdx := 0
byzVal := css[byzValIdx]
byzR := reactors[byzValIdx]
byzState := rts.states[node.NodeID]
byzReactor := rts.reactors[node.NodeID]
// Update the doPrevote function to just send a valid precommit for a random
// block and otherwise disable the priv validator.
byzState.mtx.Lock()
privVal := byzState.privValidator
byzState.doPrevote = func(height int64, round int32) {
invalidDoPrevoteFunc(t, height, round, byzState, byzReactor, privVal)
// update the doPrevote function to just send a valid precommit for a random block
// and otherwise disable the priv validator
byzVal.mtx.Lock()
pv := byzVal.privValidator
byzVal.doPrevote = func(height int64, round int32) {
invalidDoPrevoteFunc(t, height, round, byzVal, byzR.Switch, pv)
}
byzState.mtx.Unlock()
byzVal.mtx.Unlock()
t.Cleanup(func() { stopConsensusNet(log.TestingLogger(), reactors, eventBuses) })
// wait for a bunch of blocks
//
// TODO: Make this tighter by ensuring the halt happens by block 2.
var wg sync.WaitGroup
// TODO: make this tighter by ensuring the halt happens by block 2
for i := 0; i < 10; i++ {
for _, sub := range rts.subs {
wg.Add(1)
go func(s types.Subscription) {
<-s.Out()
wg.Done()
}(sub)
}
timeoutWaitGroup(t, N, func(j int) {
<-blocksSubs[j].Out()
}, css)
}
wg.Wait()
}
func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, r *Reactor, pv types.PrivValidator) {
func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch, pv types.PrivValidator) {
// routine to:
// - precommit for a random block
// - send precommit to all peers
@@ -75,10 +62,10 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, r
go func() {
cs.mtx.Lock()
cs.privValidator = pv
pubKey, err := cs.privValidator.GetPubKey(context.Background())
require.NoError(t, err)
pubKey, err := cs.privValidator.GetPubKey()
if err != nil {
panic(err)
}
addr := pubKey.Address()
valIndex, _ := cs.Validators.GetByAddress(addr)
@@ -95,24 +82,19 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, r
Hash: blockHash,
PartSetHeader: types.PartSetHeader{Total: 1, Hash: tmrand.Bytes(32)}},
}
p := precommit.ToProto()
err = cs.privValidator.SignVote(context.Background(), cs.state.ChainID, p)
require.NoError(t, err)
err = cs.privValidator.SignVote(cs.state.ChainID, p)
if err != nil {
t.Error(err)
}
precommit.Signature = p.Signature
cs.privValidator = nil // disable priv val so we don't do normal votes
cs.mtx.Unlock()
for _, ps := range r.peers {
cs.Logger.Info("sending bad vote", "block", blockHash, "peer", ps.peerID)
r.voteCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
Vote: precommit.ToProto(),
},
}
peers := sw.Peers().List()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
}
}()
}

View File

@@ -25,8 +25,6 @@ func assertMempool(txn txNotifier) mempl.Mempool {
}
func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
configSetup(t)
config := ResetConfig("consensus_mempool_txs_available_test")
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
@@ -47,8 +45,6 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
}
func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
configSetup(t)
config := ResetConfig("consensus_mempool_txs_available_test")
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
@@ -67,8 +63,6 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
}
func TestMempoolProgressInHigherRound(t *testing.T) {
configSetup(t)
config := ResetConfig("consensus_mempool_txs_available_test")
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
@@ -119,8 +113,6 @@ func deliverTxsRange(cs *State, start, end int) {
}
func TestMempoolTxConcurrentWithCommit(t *testing.T) {
configSetup(t)
state, privVals := randGenesisState(1, false, 10)
blockDB := dbm.NewMemDB()
stateStore := sm.NewStore(blockDB)
@@ -145,8 +137,6 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
}
func TestMempoolRmBadTx(t *testing.T) {
configSetup(t)
state, privVals := randGenesisState(1, false, 10)
app := NewCounterApplication()
blockDB := dbm.NewMemDB()

View File

@@ -4,9 +4,10 @@ import (
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmjson "github.com/tendermint/tendermint/libs/json"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
@@ -14,309 +15,7 @@ import (
"github.com/tendermint/tendermint/types"
)
// Message defines an interface that the consensus domain types implement. When
// a proto message is received on a consensus p2p Channel, it is wrapped and then
// converted to a Message via MsgFromProto.
type Message interface {
ValidateBasic() error
}
func init() {
tmjson.RegisterType(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage")
tmjson.RegisterType(&NewValidBlockMessage{}, "tendermint/NewValidBlockMessage")
tmjson.RegisterType(&ProposalMessage{}, "tendermint/Proposal")
tmjson.RegisterType(&ProposalPOLMessage{}, "tendermint/ProposalPOL")
tmjson.RegisterType(&BlockPartMessage{}, "tendermint/BlockPart")
tmjson.RegisterType(&VoteMessage{}, "tendermint/Vote")
tmjson.RegisterType(&HasVoteMessage{}, "tendermint/HasVote")
tmjson.RegisterType(&VoteSetMaj23Message{}, "tendermint/VoteSetMaj23")
tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
}
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
// For every height/round/step transition
type NewRoundStepMessage struct {
Height int64
Round int32
Step cstypes.RoundStepType
SecondsSinceStartTime int64
LastCommitRound int32
}
// ValidateBasic performs basic validation.
func (m *NewRoundStepMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if !m.Step.IsValid() {
return errors.New("invalid Step")
}
// NOTE: SecondsSinceStartTime may be negative
// LastCommitRound will be -1 for the initial height, but we don't know what height this is
// since it can be specified in genesis. The reactor will have to validate this via
// ValidateHeight().
if m.LastCommitRound < -1 {
return errors.New("invalid LastCommitRound (cannot be < -1)")
}
return nil
}
// ValidateHeight validates the height given the chain's initial height.
func (m *NewRoundStepMessage) ValidateHeight(initialHeight int64) error {
if m.Height < initialHeight {
return fmt.Errorf("invalid Height %v (lower than initial height %v)",
m.Height, initialHeight)
}
if m.Height == initialHeight && m.LastCommitRound != -1 {
return fmt.Errorf("invalid LastCommitRound %v (must be -1 for initial height %v)",
m.LastCommitRound, initialHeight)
}
if m.Height > initialHeight && m.LastCommitRound < 0 {
return fmt.Errorf("LastCommitRound can only be negative for initial height %v", // nolint
initialHeight)
}
return nil
}
// String returns a string representation.
func (m *NewRoundStepMessage) String() string {
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
m.Height, m.Round, m.Step, m.LastCommitRound)
}
// NewValidBlockMessage is sent when a validator observes a valid block B in some round r,
// i.e., there is a Proposal for block B and 2/3+ prevotes for the block B in the round r.
// In case the block is also committed, then IsCommit flag is set to true.
type NewValidBlockMessage struct {
Height int64
Round int32
BlockPartSetHeader types.PartSetHeader
BlockParts *bits.BitArray
IsCommit bool
}
// ValidateBasic performs basic validation.
func (m *NewValidBlockMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if err := m.BlockPartSetHeader.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockPartSetHeader: %v", err)
}
if m.BlockParts.Size() == 0 {
return errors.New("empty blockParts")
}
if m.BlockParts.Size() != int(m.BlockPartSetHeader.Total) {
return fmt.Errorf("blockParts bit array size %d not equal to BlockPartSetHeader.Total %d",
m.BlockParts.Size(),
m.BlockPartSetHeader.Total)
}
if m.BlockParts.Size() > int(types.MaxBlockPartsCount) {
return fmt.Errorf("blockParts bit array is too big: %d, max: %d", m.BlockParts.Size(), types.MaxBlockPartsCount)
}
return nil
}
// String returns a string representation.
func (m *NewValidBlockMessage) String() string {
return fmt.Sprintf("[ValidBlockMessage H:%v R:%v BP:%v BA:%v IsCommit:%v]",
m.Height, m.Round, m.BlockPartSetHeader, m.BlockParts, m.IsCommit)
}
// ProposalMessage is sent when a new block is proposed.
type ProposalMessage struct {
Proposal *types.Proposal
}
// ValidateBasic performs basic validation.
func (m *ProposalMessage) ValidateBasic() error {
return m.Proposal.ValidateBasic()
}
// String returns a string representation.
func (m *ProposalMessage) String() string {
return fmt.Sprintf("[Proposal %v]", m.Proposal)
}
// ProposalPOLMessage is sent when a previous proposal is re-proposed.
type ProposalPOLMessage struct {
Height int64
ProposalPOLRound int32
ProposalPOL *bits.BitArray
}
// ValidateBasic performs basic validation.
func (m *ProposalPOLMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.ProposalPOLRound < 0 {
return errors.New("negative ProposalPOLRound")
}
if m.ProposalPOL.Size() == 0 {
return errors.New("empty ProposalPOL bit array")
}
if m.ProposalPOL.Size() > types.MaxVotesCount {
return fmt.Errorf("proposalPOL bit array is too big: %d, max: %d", m.ProposalPOL.Size(), types.MaxVotesCount)
}
return nil
}
// String returns a string representation.
func (m *ProposalPOLMessage) String() string {
return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
}
// BlockPartMessage is sent when gossipping a piece of the proposed block.
type BlockPartMessage struct {
Height int64
Round int32
Part *types.Part
}
// ValidateBasic performs basic validation.
func (m *BlockPartMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if err := m.Part.ValidateBasic(); err != nil {
return fmt.Errorf("wrong Part: %v", err)
}
return nil
}
// String returns a string representation.
func (m *BlockPartMessage) String() string {
return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
}
// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct {
Vote *types.Vote
}
// ValidateBasic performs basic validation.
func (m *VoteMessage) ValidateBasic() error {
return m.Vote.ValidateBasic()
}
// String returns a string representation.
func (m *VoteMessage) String() string {
return fmt.Sprintf("[Vote %v]", m.Vote)
}
// HasVoteMessage is sent to indicate that a particular vote has been received.
type HasVoteMessage struct {
Height int64
Round int32
Type tmproto.SignedMsgType
Index int32
}
// ValidateBasic performs basic validation.
func (m *HasVoteMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
}
if m.Index < 0 {
return errors.New("negative Index")
}
return nil
}
// String returns a string representation.
func (m *HasVoteMessage) String() string {
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v}]", m.Index, m.Height, m.Round, m.Type)
}
// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
type VoteSetMaj23Message struct {
Height int64
Round int32
Type tmproto.SignedMsgType
BlockID types.BlockID
}
// ValidateBasic performs basic validation.
func (m *VoteSetMaj23Message) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
}
if err := m.BlockID.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockID: %v", err)
}
return nil
}
// String returns a string representation.
func (m *VoteSetMaj23Message) String() string {
return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
}
// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the
// BlockID.
type VoteSetBitsMessage struct {
Height int64
Round int32
Type tmproto.SignedMsgType
BlockID types.BlockID
Votes *bits.BitArray
}
// ValidateBasic performs basic validation.
func (m *VoteSetBitsMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
}
if err := m.BlockID.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockID: %v", err)
}
// NOTE: Votes.Size() can be zero if the node does not have any
if m.Votes.Size() > types.MaxVotesCount {
return fmt.Errorf("votes bit array is too big: %d, max: %d", m.Votes.Size(), types.MaxVotesCount)
}
return nil
}
// String returns a string representation.
func (m *VoteSetBitsMessage) String() string {
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
}
// MsgToProto takes a consensus message type and returns the proto defined
// consensus message.
//
// TODO: This needs to be removed, but WALToProto depends on this.
// MsgToProto takes a consensus message type and returns the proto defined consensus message
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
@@ -444,7 +143,7 @@ func MsgToProto(msg Message) (*tmcons.Message, error) {
return &pb, nil
}
// MsgFromProto takes a consensus proto message and returns the native go type.
// MsgFromProto takes a consensus proto message and returns the native go type
func MsgFromProto(msg *tmcons.Message) (Message, error) {
if msg == nil {
return nil, errors.New("consensus: nil message")
@@ -570,7 +269,21 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
return pb, nil
}
// WALToProto takes a WAL message and return a proto walMessage and error.
// MustEncode takes the reactors msg, makes it proto and marshals it
// this mimics `MustMarshalBinaryBare` in that is panics on error
func MustEncode(msg Message) []byte {
pb, err := MsgToProto(msg)
if err != nil {
panic(err)
}
enc, err := proto.Marshal(pb)
if err != nil {
panic(err)
}
return enc
}
// WALToProto takes a WAL message and return a proto walMessage and error
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
var pb tmcons.WALMessage
@@ -598,7 +311,6 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
},
},
}
case timeoutInfo:
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_TimeoutInfo{
@@ -610,7 +322,6 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
},
},
}
case EndHeightMessage:
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_EndHeight{
@@ -619,7 +330,6 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
},
},
}
default:
return nil, fmt.Errorf("to proto: wal message not recognized: %T", msg)
}
@@ -627,13 +337,11 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
return &pb, nil
}
// WALFromProto takes a proto wal message and return a consensus walMessage and
// error.
// WALFromProto takes a proto wal message and return a consensus walMessage and error
func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
if msg == nil {
return nil, errors.New("nil WAL message")
}
var pb WALMessage
switch msg := msg.Sum.(type) {
@@ -643,7 +351,6 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
Round: msg.EventDataRoundState.Round,
Step: msg.EventDataRoundState.Step,
}
case *tmcons.WALMessage_MsgInfo:
walMsg, err := MsgFromProto(&msg.MsgInfo.Msg)
if err != nil {
@@ -660,26 +367,20 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
if err != nil {
return nil, fmt.Errorf("denying message due to possible overflow: %w", err)
}
pb = timeoutInfo{
Duration: msg.TimeoutInfo.Duration,
Height: msg.TimeoutInfo.Height,
Round: msg.TimeoutInfo.Round,
Step: cstypes.RoundStepType(tis),
}
return pb, nil
case *tmcons.WALMessage_EndHeight:
pb := EndHeightMessage{
Height: msg.EndHeight.Height,
}
return pb, nil
default:
return nil, fmt.Errorf("from proto: wal message not recognized: %T", msg)
}
return pb, nil
}

View File

@@ -1,9 +1,7 @@
package consensus
import (
"context"
"encoding/hex"
"fmt"
"math"
"testing"
"time"
@@ -12,11 +10,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/bits"
"github.com/tendermint/tendermint/libs/bytes"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
@@ -63,7 +58,7 @@ func TestMsgToProto(t *testing.T) {
pbProposal := proposal.ToProto()
pv := types.NewMockPV()
pk, err := pv.GetPubKey(context.Background())
pk, err := pv.GetPubKey()
require.NoError(t, err)
val := types.NewValidator(pk, 100)
@@ -430,309 +425,3 @@ func TestConsMsgsVectors(t *testing.T) {
})
}
}
func TestVoteSetMaj23MessageValidateBasic(t *testing.T) {
const (
validSignedMsgType tmproto.SignedMsgType = 0x01
invalidSignedMsgType tmproto.SignedMsgType = 0x03
)
validBlockID := types.BlockID{}
invalidBlockID := types.BlockID{
Hash: bytes.HexBytes{},
PartSetHeader: types.PartSetHeader{
Total: 1,
Hash: []byte{0},
},
}
testCases := []struct { // nolint: maligned
expectErr bool
messageRound int32
messageHeight int64
testName string
messageType tmproto.SignedMsgType
messageBlockID types.BlockID
}{
{false, 0, 0, "Valid Message", validSignedMsgType, validBlockID},
{true, -1, 0, "Invalid Message", validSignedMsgType, validBlockID},
{true, 0, -1, "Invalid Message", validSignedMsgType, validBlockID},
{true, 0, 0, "Invalid Message", invalidSignedMsgType, validBlockID},
{true, 0, 0, "Invalid Message", validSignedMsgType, invalidBlockID},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := VoteSetMaj23Message{
Height: tc.messageHeight,
Round: tc.messageRound,
Type: tc.messageType,
BlockID: tc.messageBlockID,
}
assert.Equal(t, tc.expectErr, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
}
func TestVoteSetBitsMessageValidateBasic(t *testing.T) {
testCases := []struct {
malleateFn func(*VoteSetBitsMessage)
expErr string
}{
{func(msg *VoteSetBitsMessage) {}, ""},
{func(msg *VoteSetBitsMessage) { msg.Height = -1 }, "negative Height"},
{func(msg *VoteSetBitsMessage) { msg.Type = 0x03 }, "invalid Type"},
{func(msg *VoteSetBitsMessage) {
msg.BlockID = types.BlockID{
Hash: bytes.HexBytes{},
PartSetHeader: types.PartSetHeader{
Total: 1,
Hash: []byte{0},
},
}
}, "wrong BlockID: wrong PartSetHeader: wrong Hash:"},
{func(msg *VoteSetBitsMessage) { msg.Votes = bits.NewBitArray(types.MaxVotesCount + 1) },
"votes bit array is too big: 10001, max: 10000"},
}
for i, tc := range testCases {
tc := tc
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
msg := &VoteSetBitsMessage{
Height: 1,
Round: 0,
Type: 0x01,
Votes: bits.NewBitArray(1),
BlockID: types.BlockID{},
}
tc.malleateFn(msg)
err := msg.ValidateBasic()
if tc.expErr != "" && assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expErr)
}
})
}
}
func TestNewRoundStepMessageValidateBasic(t *testing.T) {
testCases := []struct { // nolint: maligned
expectErr bool
messageRound int32
messageLastCommitRound int32
messageHeight int64
testName string
messageStep cstypes.RoundStepType
}{
{false, 0, 0, 0, "Valid Message", cstypes.RoundStepNewHeight},
{true, -1, 0, 0, "Negative round", cstypes.RoundStepNewHeight},
{true, 0, 0, -1, "Negative height", cstypes.RoundStepNewHeight},
{true, 0, 0, 0, "Invalid Step", cstypes.RoundStepCommit + 1},
// The following cases will be handled by ValidateHeight
{false, 0, 0, 1, "H == 1 but LCR != -1 ", cstypes.RoundStepNewHeight},
{false, 0, -1, 2, "H > 1 but LCR < 0", cstypes.RoundStepNewHeight},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := NewRoundStepMessage{
Height: tc.messageHeight,
Round: tc.messageRound,
Step: tc.messageStep,
LastCommitRound: tc.messageLastCommitRound,
}
err := message.ValidateBasic()
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestNewRoundStepMessageValidateHeight(t *testing.T) {
initialHeight := int64(10)
testCases := []struct { // nolint: maligned
expectErr bool
messageLastCommitRound int32
messageHeight int64
testName string
}{
{false, 0, 11, "Valid Message"},
{true, 0, -1, "Negative height"},
{true, 0, 0, "Zero height"},
{true, 0, 10, "Initial height but LCR != -1 "},
{true, -1, 11, "Normal height but LCR < 0"},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := NewRoundStepMessage{
Height: tc.messageHeight,
Round: 0,
Step: cstypes.RoundStepNewHeight,
LastCommitRound: tc.messageLastCommitRound,
}
err := message.ValidateHeight(initialHeight)
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestNewValidBlockMessageValidateBasic(t *testing.T) {
testCases := []struct {
malleateFn func(*NewValidBlockMessage)
expErr string
}{
{func(msg *NewValidBlockMessage) {}, ""},
{func(msg *NewValidBlockMessage) { msg.Height = -1 }, "negative Height"},
{func(msg *NewValidBlockMessage) { msg.Round = -1 }, "negative Round"},
{
func(msg *NewValidBlockMessage) { msg.BlockPartSetHeader.Total = 2 },
"blockParts bit array size 1 not equal to BlockPartSetHeader.Total 2",
},
{
func(msg *NewValidBlockMessage) {
msg.BlockPartSetHeader.Total = 0
msg.BlockParts = bits.NewBitArray(0)
},
"empty blockParts",
},
{
func(msg *NewValidBlockMessage) { msg.BlockParts = bits.NewBitArray(int(types.MaxBlockPartsCount) + 1) },
"blockParts bit array size 1602 not equal to BlockPartSetHeader.Total 1",
},
}
for i, tc := range testCases {
tc := tc
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
msg := &NewValidBlockMessage{
Height: 1,
Round: 0,
BlockPartSetHeader: types.PartSetHeader{
Total: 1,
},
BlockParts: bits.NewBitArray(1),
}
tc.malleateFn(msg)
err := msg.ValidateBasic()
if tc.expErr != "" && assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expErr)
}
})
}
}
func TestProposalPOLMessageValidateBasic(t *testing.T) {
testCases := []struct {
malleateFn func(*ProposalPOLMessage)
expErr string
}{
{func(msg *ProposalPOLMessage) {}, ""},
{func(msg *ProposalPOLMessage) { msg.Height = -1 }, "negative Height"},
{func(msg *ProposalPOLMessage) { msg.ProposalPOLRound = -1 }, "negative ProposalPOLRound"},
{func(msg *ProposalPOLMessage) { msg.ProposalPOL = bits.NewBitArray(0) }, "empty ProposalPOL bit array"},
{func(msg *ProposalPOLMessage) { msg.ProposalPOL = bits.NewBitArray(types.MaxVotesCount + 1) },
"proposalPOL bit array is too big: 10001, max: 10000"},
}
for i, tc := range testCases {
tc := tc
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
msg := &ProposalPOLMessage{
Height: 1,
ProposalPOLRound: 1,
ProposalPOL: bits.NewBitArray(1),
}
tc.malleateFn(msg)
err := msg.ValidateBasic()
if tc.expErr != "" && assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expErr)
}
})
}
}
func TestBlockPartMessageValidateBasic(t *testing.T) {
testPart := new(types.Part)
testPart.Proof.LeafHash = tmhash.Sum([]byte("leaf"))
testCases := []struct {
testName string
messageHeight int64
messageRound int32
messagePart *types.Part
expectErr bool
}{
{"Valid Message", 0, 0, testPart, false},
{"Invalid Message", -1, 0, testPart, true},
{"Invalid Message", 0, -1, testPart, true},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := BlockPartMessage{
Height: tc.messageHeight,
Round: tc.messageRound,
Part: tc.messagePart,
}
assert.Equal(t, tc.expectErr, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
message := BlockPartMessage{Height: 0, Round: 0, Part: new(types.Part)}
message.Part.Index = 1
assert.Equal(t, true, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
}
func TestHasVoteMessageValidateBasic(t *testing.T) {
const (
validSignedMsgType tmproto.SignedMsgType = 0x01
invalidSignedMsgType tmproto.SignedMsgType = 0x03
)
testCases := []struct { // nolint: maligned
expectErr bool
messageRound int32
messageIndex int32
messageHeight int64
testName string
messageType tmproto.SignedMsgType
}{
{false, 0, 0, 0, "Valid Message", validSignedMsgType},
{true, -1, 0, 0, "Invalid Message", validSignedMsgType},
{true, 0, -1, 0, "Invalid Message", validSignedMsgType},
{true, 0, 0, 0, "Invalid Message", invalidSignedMsgType},
{true, 0, 0, -1, "Invalid Message", validSignedMsgType},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := HasVoteMessage{
Height: tc.messageHeight,
Round: tc.messageRound,
Type: tc.messageType,
Index: tc.messageIndex,
}
assert.Equal(t, tc.expectErr, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
}

View File

@@ -1,528 +0,0 @@
package consensus
import (
"errors"
"fmt"
"sync"
"time"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)
var (
ErrPeerStateHeightRegression = errors.New("peer state height regression")
ErrPeerStateInvalidStartTime = errors.New("peer state invalid startTime")
)
// peerStateStats holds internal statistics for a peer.
type peerStateStats struct {
Votes int `json:"votes"`
BlockParts int `json:"block_parts"`
}
func (pss peerStateStats) String() string {
return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", pss.Votes, pss.BlockParts)
}
// PeerState contains the known state of a peer, including its connection and
// threadsafe access to its PeerRoundState.
// NOTE: THIS GETS DUMPED WITH rpc/core/consensus.go.
// Be mindful of what you Expose.
type PeerState struct {
peerID p2p.NodeID
logger log.Logger
// NOTE: Modify below using setters, never directly.
mtx tmsync.RWMutex
running bool
PRS cstypes.PeerRoundState `json:"round_state"`
Stats *peerStateStats `json:"stats"`
broadcastWG sync.WaitGroup
closer *tmsync.Closer
}
// NewPeerState returns a new PeerState for the given node ID.
func NewPeerState(logger log.Logger, peerID p2p.NodeID) *PeerState {
return &PeerState{
peerID: peerID,
logger: logger,
closer: tmsync.NewCloser(),
PRS: cstypes.PeerRoundState{
Round: -1,
ProposalPOLRound: -1,
LastCommitRound: -1,
CatchupCommitRound: -1,
},
Stats: &peerStateStats{},
}
}
// SetRunning sets the running state of the peer.
func (ps *PeerState) SetRunning(v bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.running = v
}
// IsRunning returns true if a PeerState is considered running where multiple
// broadcasting goroutines exist for the peer.
func (ps *PeerState) IsRunning() bool {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return ps.running
}
// GetRoundState returns a shallow copy of the PeerRoundState. There's no point
// in mutating it since it won't change PeerState.
func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState {
ps.mtx.Lock()
defer ps.mtx.Unlock()
prs := ps.PRS // copy
return &prs
}
// ToJSON returns a json of PeerState.
func (ps *PeerState) ToJSON() ([]byte, error) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return tmjson.Marshal(ps)
}
// GetHeight returns an atomic snapshot of the PeerRoundState's height used by
// the mempool to ensure peers are caught up before broadcasting new txs.
func (ps *PeerState) GetHeight() int64 {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.PRS.Height
}
// SetHasProposal sets the given proposal as known for the peer.
func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round {
return
}
if ps.PRS.Proposal {
return
}
ps.PRS.Proposal = true
// ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage
if ps.PRS.ProposalBlockParts != nil {
return
}
ps.PRS.ProposalBlockPartSetHeader = proposal.BlockID.PartSetHeader
ps.PRS.ProposalBlockParts = bits.NewBitArray(int(proposal.BlockID.PartSetHeader.Total))
ps.PRS.ProposalPOLRound = proposal.POLRound
ps.PRS.ProposalPOL = nil // Nil until ProposalPOLMessage received.
}
// InitProposalBlockParts initializes the peer's proposal block parts header
// and bit array.
func (ps *PeerState) InitProposalBlockParts(partSetHeader types.PartSetHeader) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.ProposalBlockParts != nil {
return
}
ps.PRS.ProposalBlockPartSetHeader = partSetHeader
ps.PRS.ProposalBlockParts = bits.NewBitArray(int(partSetHeader.Total))
}
// SetHasProposalBlockPart sets the given block part index as known for the peer.
func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index int) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != height || ps.PRS.Round != round {
return
}
ps.PRS.ProposalBlockParts.SetIndex(index, true)
}
// PickVoteToSend picks a vote to send to the peer. It will return true if a
// vote was picked.
//
// NOTE: `votes` must be the correct Size() for the Height().
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (*types.Vote, bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if votes.Size() == 0 {
return nil, false
}
var (
height = votes.GetHeight()
round = votes.GetRound()
votesType = tmproto.SignedMsgType(votes.Type())
size = votes.Size()
)
// lazily set data using 'votes'
if votes.IsCommit() {
ps.ensureCatchupCommitRound(height, round, size)
}
ps.ensureVoteBitArrays(height, size)
psVotes := ps.getVoteBitArray(height, round, votesType)
if psVotes == nil {
return nil, false // not something worth sending
}
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
return votes.GetByIndex(int32(index)), true
}
return nil, false
}
func (ps *PeerState) getVoteBitArray(height int64, round int32, votesType tmproto.SignedMsgType) *bits.BitArray {
if !types.IsVoteTypeValid(votesType) {
return nil
}
if ps.PRS.Height == height {
if ps.PRS.Round == round {
switch votesType {
case tmproto.PrevoteType:
return ps.PRS.Prevotes
case tmproto.PrecommitType:
return ps.PRS.Precommits
}
}
if ps.PRS.CatchupCommitRound == round {
switch votesType {
case tmproto.PrevoteType:
return nil
case tmproto.PrecommitType:
return ps.PRS.CatchupCommit
}
}
if ps.PRS.ProposalPOLRound == round {
switch votesType {
case tmproto.PrevoteType:
return ps.PRS.ProposalPOL
case tmproto.PrecommitType:
return nil
}
}
return nil
}
if ps.PRS.Height == height+1 {
if ps.PRS.LastCommitRound == round {
switch votesType {
case tmproto.PrevoteType:
return nil
case tmproto.PrecommitType:
return ps.PRS.LastCommit
}
}
return nil
}
return nil
}
// 'round': A round for which we have a +2/3 commit.
func (ps *PeerState) ensureCatchupCommitRound(height int64, round int32, numValidators int) {
if ps.PRS.Height != height {
return
}
/*
NOTE: This is wrong, 'round' could change.
e.g. if orig round is not the same as block LastCommit round.
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
panic(fmt.Sprintf(
"Conflicting CatchupCommitRound. Height: %v,
Orig: %v,
New: %v",
height,
ps.CatchupCommitRound,
round))
}
*/
if ps.PRS.CatchupCommitRound == round {
return // Nothing to do!
}
ps.PRS.CatchupCommitRound = round
if round == ps.PRS.Round {
ps.PRS.CatchupCommit = ps.PRS.Precommits
} else {
ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
}
}
// EnsureVoteBitArrays ensures the bit-arrays have been allocated for tracking
// what votes this peer has received.
// NOTE: It's important to make sure that numValidators actually matches
// what the node sees as the number of validators for height.
func (ps *PeerState) EnsureVoteBitArrays(height int64, numValidators int) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.ensureVoteBitArrays(height, numValidators)
}
func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) {
if ps.PRS.Height == height {
if ps.PRS.Prevotes == nil {
ps.PRS.Prevotes = bits.NewBitArray(numValidators)
}
if ps.PRS.Precommits == nil {
ps.PRS.Precommits = bits.NewBitArray(numValidators)
}
if ps.PRS.CatchupCommit == nil {
ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
}
if ps.PRS.ProposalPOL == nil {
ps.PRS.ProposalPOL = bits.NewBitArray(numValidators)
}
} else if ps.PRS.Height == height+1 {
if ps.PRS.LastCommit == nil {
ps.PRS.LastCommit = bits.NewBitArray(numValidators)
}
}
}
// RecordVote increments internal votes related statistics for this peer.
// It returns the total number of added votes.
func (ps *PeerState) RecordVote() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.Stats.Votes++
return ps.Stats.Votes
}
// VotesSent returns the number of blocks for which peer has been sending us
// votes.
func (ps *PeerState) VotesSent() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.Stats.Votes
}
// RecordBlockPart increments internal block part related statistics for this peer.
// It returns the total number of added block parts.
func (ps *PeerState) RecordBlockPart() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.Stats.BlockParts++
return ps.Stats.BlockParts
}
// BlockPartsSent returns the number of useful block parts the peer has sent us.
func (ps *PeerState) BlockPartsSent() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.Stats.BlockParts
}
// SetHasVote sets the given vote as known by the peer
func (ps *PeerState) SetHasVote(vote *types.Vote) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
}
func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.SignedMsgType, index int32) {
logger := ps.logger.With(
"peerH/R", fmt.Sprintf("%d/%d", ps.PRS.Height, ps.PRS.Round),
"H/R", fmt.Sprintf("%d/%d", height, round),
)
logger.Debug("setHasVote", "type", voteType, "index", index)
// NOTE: some may be nil BitArrays -> no side effects
psVotes := ps.getVoteBitArray(height, round, voteType)
if psVotes != nil {
psVotes.SetIndex(int(index), true)
}
}
// ApplyNewRoundStepMessage updates the peer state for the new round.
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
// ignore duplicates or decreases
if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 {
return
}
var (
psHeight = ps.PRS.Height
psRound = ps.PRS.Round
psCatchupCommitRound = ps.PRS.CatchupCommitRound
psCatchupCommit = ps.PRS.CatchupCommit
startTime = tmtime.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
)
ps.PRS.Height = msg.Height
ps.PRS.Round = msg.Round
ps.PRS.Step = msg.Step
ps.PRS.StartTime = startTime
if psHeight != msg.Height || psRound != msg.Round {
ps.PRS.Proposal = false
ps.PRS.ProposalBlockPartSetHeader = types.PartSetHeader{}
ps.PRS.ProposalBlockParts = nil
ps.PRS.ProposalPOLRound = -1
ps.PRS.ProposalPOL = nil
// we'll update the BitArray capacity later
ps.PRS.Prevotes = nil
ps.PRS.Precommits = nil
}
if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
// Peer caught up to CatchupCommitRound.
// Preserve psCatchupCommit!
// NOTE: We prefer to use prs.Precommits if
// pr.Round matches pr.CatchupCommitRound.
ps.PRS.Precommits = psCatchupCommit
}
if psHeight != msg.Height {
// shift Precommits to LastCommit
if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
ps.PRS.LastCommitRound = msg.LastCommitRound
ps.PRS.LastCommit = ps.PRS.Precommits
} else {
ps.PRS.LastCommitRound = msg.LastCommitRound
ps.PRS.LastCommit = nil
}
// we'll update the BitArray capacity later
ps.PRS.CatchupCommitRound = -1
ps.PRS.CatchupCommit = nil
}
}
// ApplyNewValidBlockMessage updates the peer state for the new valid block.
func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != msg.Height {
return
}
if ps.PRS.Round != msg.Round && !msg.IsCommit {
return
}
ps.PRS.ProposalBlockPartSetHeader = msg.BlockPartSetHeader
ps.PRS.ProposalBlockParts = msg.BlockParts
}
// ApplyProposalPOLMessage updates the peer state for the new proposal POL.
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != msg.Height {
return
}
if ps.PRS.ProposalPOLRound != msg.ProposalPOLRound {
return
}
// TODO: Merge onto existing ps.PRS.ProposalPOL?
// We might have sent some prevotes in the meantime.
ps.PRS.ProposalPOL = msg.ProposalPOL
}
// ApplyHasVoteMessage updates the peer state for the new vote.
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != msg.Height {
return
}
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
}
// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
// it claims to have for the corresponding BlockID.
// `ourVotes` is a BitArray of votes we have for msg.BlockID
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
// we conservatively overwrite ps's votes w/ msg.Votes.
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *bits.BitArray) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
if votes != nil {
if ourVotes == nil {
votes.Update(msg.Votes)
} else {
otherVotes := votes.Sub(ourVotes)
hasVotes := otherVotes.Or(msg.Votes)
votes.Update(hasVotes)
}
}
}
// String returns a string representation of the PeerState
func (ps *PeerState) String() string {
return ps.StringIndented("")
}
// StringIndented returns a string representation of the PeerState
func (ps *PeerState) StringIndented(indent string) string {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return fmt.Sprintf(`PeerState{
%s Key %v
%s RoundState %v
%s Stats %v
%s}`,
indent, ps.peerID,
indent, ps.PRS.StringIndented(indent+" "),
indent, ps.Stats,
indent,
)
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -34,6 +34,21 @@ import (
"github.com/tendermint/tendermint/types"
)
func TestMain(m *testing.M) {
config = ResetConfig("consensus_reactor_test")
consensusReplayConfig = ResetConfig("consensus_replay_test")
configStateTest := ResetConfig("consensus_state_test")
configMempoolTest := ResetConfig("consensus_mempool_test")
configByzantineTest := ResetConfig("consensus_byzantine_test")
code := m.Run()
os.RemoveAll(config.RootDir)
os.RemoveAll(consensusReplayConfig.RootDir)
os.RemoveAll(configStateTest.RootDir)
os.RemoveAll(configMempoolTest.RootDir)
os.RemoveAll(configByzantineTest.RootDir)
os.Exit(code)
}
// These tests ensure we can always recover from failure at any part of the consensus process.
// There are two general failure scenarios: failure during consensus, and failure while applying the block.
// Only the latter interacts with the app and store,
@@ -306,8 +321,6 @@ var modes = []uint{0, 1, 2, 3}
// This is actually not a test, it's for storing validator change tx data for testHandshakeReplay
func TestSimulateValidatorsChange(t *testing.T) {
configSetup(t)
nPeers := 7
nVals := 4
css, genDoc, config, cleanup := randConsensusNetWithPeers(
@@ -343,7 +356,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
// HEIGHT 2
height++
incrementHeight(vss...)
newValidatorPubKey1, err := css[nVals].privValidator.GetPubKey(context.Background())
newValidatorPubKey1, err := css[nVals].privValidator.GetPubKey()
require.NoError(t, err)
valPubKey1ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey1)
require.NoError(t, err)
@@ -356,7 +369,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
proposal := types.NewProposal(vss[1].Height, round, -1, blockID)
p := proposal.ToProto()
if err := vss[1].SignProposal(context.Background(), config.ChainID(), p); err != nil {
if err := vss[1].SignProposal(config.ChainID(), p); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
proposal.Signature = p.Signature
@@ -373,7 +386,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
// HEIGHT 3
height++
incrementHeight(vss...)
updateValidatorPubKey1, err := css[nVals].privValidator.GetPubKey(context.Background())
updateValidatorPubKey1, err := css[nVals].privValidator.GetPubKey()
require.NoError(t, err)
updatePubKey1ABCI, err := cryptoenc.PubKeyToProto(updateValidatorPubKey1)
require.NoError(t, err)
@@ -386,7 +399,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
proposal = types.NewProposal(vss[2].Height, round, -1, blockID)
p = proposal.ToProto()
if err := vss[2].SignProposal(context.Background(), config.ChainID(), p); err != nil {
if err := vss[2].SignProposal(config.ChainID(), p); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
proposal.Signature = p.Signature
@@ -403,14 +416,14 @@ func TestSimulateValidatorsChange(t *testing.T) {
// HEIGHT 4
height++
incrementHeight(vss...)
newValidatorPubKey2, err := css[nVals+1].privValidator.GetPubKey(context.Background())
newValidatorPubKey2, err := css[nVals+1].privValidator.GetPubKey()
require.NoError(t, err)
newVal2ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey2)
require.NoError(t, err)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{})
assert.Nil(t, err)
newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey(context.Background())
newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey()
require.NoError(t, err)
newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3)
require.NoError(t, err)
@@ -426,10 +439,10 @@ func TestSimulateValidatorsChange(t *testing.T) {
valIndexFn := func(cssIdx int) int {
for i, vs := range newVss {
vsPubKey, err := vs.GetPubKey(context.Background())
vsPubKey, err := vs.GetPubKey()
require.NoError(t, err)
cssPubKey, err := css[cssIdx].privValidator.GetPubKey(context.Background())
cssPubKey, err := css[cssIdx].privValidator.GetPubKey()
require.NoError(t, err)
if vsPubKey.Equals(cssPubKey) {
@@ -443,7 +456,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
proposal = types.NewProposal(vss[3].Height, round, -1, blockID)
p = proposal.ToProto()
if err := vss[3].SignProposal(context.Background(), config.ChainID(), p); err != nil {
if err := vss[3].SignProposal(config.ChainID(), p); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
proposal.Signature = p.Signature
@@ -502,7 +515,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
selfIndex = valIndexFn(0)
proposal = types.NewProposal(vss[1].Height, round, -1, blockID)
p = proposal.ToProto()
if err := vss[1].SignProposal(context.Background(), config.ChainID(), p); err != nil {
if err := vss[1].SignProposal(config.ChainID(), p); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
proposal.Signature = p.Signature
@@ -531,8 +544,6 @@ func TestSimulateValidatorsChange(t *testing.T) {
// Sync from scratch
func TestHandshakeReplayAll(t *testing.T) {
configSetup(t)
for _, m := range modes {
testHandshakeReplay(t, config, 0, m, false)
}
@@ -543,8 +554,6 @@ func TestHandshakeReplayAll(t *testing.T) {
// Sync many, not from scratch
func TestHandshakeReplaySome(t *testing.T) {
configSetup(t)
for _, m := range modes {
testHandshakeReplay(t, config, 2, m, false)
}
@@ -555,8 +564,6 @@ func TestHandshakeReplaySome(t *testing.T) {
// Sync from lagging by one
func TestHandshakeReplayOne(t *testing.T) {
configSetup(t)
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks-1, m, false)
}
@@ -567,8 +574,6 @@ func TestHandshakeReplayOne(t *testing.T) {
// Sync from caught up
func TestHandshakeReplayNone(t *testing.T) {
configSetup(t)
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks, m, false)
}
@@ -579,8 +584,6 @@ func TestHandshakeReplayNone(t *testing.T) {
// Test mockProxyApp should not panic when app return ABCIResponses with some empty ResponseDeliverTx
func TestMockProxyApp(t *testing.T) {
configSetup(t)
sim.CleanupFunc() // clean the test env created in TestSimulateValidatorsChange
logger := log.TestingLogger()
var validTxs, invalidTxs = 0, 0
@@ -672,8 +675,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
walFile := tempWALWithData(walBody)
config.Consensus.SetWalFile(walFile)
privVal, err := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
require.NoError(t, err)
privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
wal, err := NewWAL(walFile)
require.NoError(t, err)
@@ -687,7 +689,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
})
chain, commits, err = makeBlockchainFromWAL(wal)
require.NoError(t, err)
pubKey, err := privVal.GetPubKey(context.Background())
pubKey, err := privVal.GetPubKey()
require.NoError(t, err)
stateDB, genesisState, store = stateAndStore(config, pubKey, kvstore.ProtocolVersion)
@@ -885,10 +887,9 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - 0x03
config := ResetConfig("handshake_test_")
t.Cleanup(func() { os.RemoveAll(config.RootDir) })
privVal, err := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
require.NoError(t, err)
privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
const appVersion = 0x0
pubKey, err := privVal.GetPubKey(context.Background())
pubKey, err := privVal.GetPubKey()
require.NoError(t, err)
stateDB, state, store := stateAndStore(config, pubKey, appVersion)
stateStore := sm.NewStore(stateDB)
@@ -1222,9 +1223,8 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
config := ResetConfig("handshake_test_")
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
privVal, err := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
require.NoError(t, err)
pubKey, err := privVal.GetPubKey(context.Background())
privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
pubKey, err := privVal.GetPubKey()
require.NoError(t, err)
stateDB, state, store := stateAndStore(config, pubKey, 0x0)
stateStore := sm.NewStore(stateDB)

File diff suppressed because it is too large Load Diff

View File

@@ -56,8 +56,6 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh
// ProposeSuite
func TestStateProposerSelection0(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
height, round := cs1.Height, cs1.Round
@@ -71,7 +69,7 @@ func TestStateProposerSelection0(t *testing.T) {
// Commit a block and ensure proposer for the next height is correct.
prop := cs1.GetRoundState().Validators.GetProposer()
pv, err := cs1.privValidator.GetPubKey(context.Background())
pv, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
address := pv.Address()
if !bytes.Equal(prop.Address, address) {
@@ -88,7 +86,7 @@ func TestStateProposerSelection0(t *testing.T) {
ensureNewRound(newRoundCh, height+1, 0)
prop = cs1.GetRoundState().Validators.GetProposer()
pv1, err := vss[1].GetPubKey(context.Background())
pv1, err := vss[1].GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
if !bytes.Equal(prop.Address, addr) {
@@ -98,8 +96,6 @@ func TestStateProposerSelection0(t *testing.T) {
// Now let's do it all again, but starting from round 2 instead of 0
func TestStateProposerSelection2(t *testing.T) {
configSetup(t)
cs1, vss := randState(4) // test needs more work for more than 3 validators
height := cs1.Height
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
@@ -116,7 +112,7 @@ func TestStateProposerSelection2(t *testing.T) {
// everyone just votes nil. we get a new proposer each round
for i := int32(0); int(i) < len(vss); i++ {
prop := cs1.GetRoundState().Validators.GetProposer()
pvk, err := vss[int(i+round)%len(vss)].GetPubKey(context.Background())
pvk, err := vss[int(i+round)%len(vss)].GetPubKey()
require.NoError(t, err)
addr := pvk.Address()
correctProposer := addr
@@ -137,8 +133,6 @@ func TestStateProposerSelection2(t *testing.T) {
// a non-validator should timeout into the prevote round
func TestStateEnterProposeNoPrivValidator(t *testing.T) {
configSetup(t)
cs, _ := randState(1)
cs.SetPrivValidator(nil)
height, round := cs.Height, cs.Round
@@ -158,8 +152,6 @@ func TestStateEnterProposeNoPrivValidator(t *testing.T) {
// a validator should not timeout of the prevote round (TODO: unless the block is really big!)
func TestStateEnterProposeYesPrivValidator(t *testing.T) {
configSetup(t)
cs, _ := randState(1)
height, round := cs.Height, cs.Round
@@ -190,8 +182,6 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) {
}
func TestStateBadProposal(t *testing.T) {
configSetup(t)
cs1, vss := randState(2)
height, round := cs1.Height, cs1.Round
vs2 := vss[1]
@@ -218,7 +208,7 @@ func TestStateBadProposal(t *testing.T) {
blockID := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
proposal := types.NewProposal(vs2.Height, round, -1, blockID)
p := proposal.ToProto()
if err := vs2.SignProposal(context.Background(), config.ChainID(), p); err != nil {
if err := vs2.SignProposal(config.ChainID(), p); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
@@ -250,8 +240,6 @@ func TestStateBadProposal(t *testing.T) {
}
func TestStateOversizedBlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(2)
cs1.state.ConsensusParams.Block.MaxBytes = 2000
height, round := cs1.Height, cs1.Round
@@ -274,7 +262,7 @@ func TestStateOversizedBlock(t *testing.T) {
blockID := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
proposal := types.NewProposal(height, round, -1, blockID)
p := proposal.ToProto()
if err := vs2.SignProposal(context.Background(), config.ChainID(), p); err != nil {
if err := vs2.SignProposal(config.ChainID(), p); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
proposal.Signature = p.Signature
@@ -314,8 +302,6 @@ func TestStateOversizedBlock(t *testing.T) {
// propose, prevote, and precommit a block
func TestStateFullRound1(t *testing.T) {
configSetup(t)
cs, vss := randState(1)
height, round := cs.Height, cs.Round
@@ -356,8 +342,6 @@ func TestStateFullRound1(t *testing.T) {
// nil is proposed, so prevote and precommit nil
func TestStateFullRoundNil(t *testing.T) {
configSetup(t)
cs, vss := randState(1)
height, round := cs.Height, cs.Round
@@ -376,8 +360,6 @@ func TestStateFullRoundNil(t *testing.T) {
// run through propose, prevote, precommit commit with two validators
// where the first validator has to wait for votes from the second
func TestStateFullRound2(t *testing.T) {
configSetup(t)
cs1, vss := randState(2)
vs2 := vss[1]
height, round := cs1.Height, cs1.Round
@@ -418,8 +400,6 @@ func TestStateFullRound2(t *testing.T) {
// two validators, 4 rounds.
// two vals take turns proposing. val1 locks on first one, precommits nil on everything else
func TestStateLockNoPOL(t *testing.T) {
configSetup(t)
cs1, vss := randState(2)
vs2 := vss[1]
height, round := cs1.Height, cs1.Round
@@ -606,8 +586,6 @@ func TestStateLockNoPOL(t *testing.T) {
// in round two: v1 prevotes the same block that the node is locked on
// the others prevote a new block hence v1 changes lock and precommits the new block with the others
func TestStateLockPOLRelock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -616,7 +594,7 @@ func TestStateLockPOLRelock(t *testing.T) {
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait)
proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -705,8 +683,6 @@ func TestStateLockPOLRelock(t *testing.T) {
// 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka
func TestStateLockPOLUnlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -717,7 +693,7 @@ func TestStateLockPOLUnlock(t *testing.T) {
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
unlockCh := subscribe(cs1.eventBus, types.EventQueryUnlock)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -799,8 +775,6 @@ func TestStateLockPOLUnlock(t *testing.T) {
// v1 should unlock and precommit nil. In the third round another block is proposed, all vals
// prevote and now v1 can lock onto the third block and precommit that
func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -809,7 +783,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait)
proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -927,8 +901,6 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
// then a polka at round 2 that we lock on
// then we see the polka from round 1 but shouldn't unlock
func TestStateLockPOLSafety1(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -939,7 +911,7 @@ func TestStateLockPOLSafety1(t *testing.T) {
timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose)
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1048,8 +1020,6 @@ func TestStateLockPOLSafety1(t *testing.T) {
// What we want:
// dont see P0, lock on P1 at R1, dont unlock using P0 at R2
func TestStateLockPOLSafety2(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -1060,7 +1030,7 @@ func TestStateLockPOLSafety2(t *testing.T) {
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
unlockCh := subscribe(cs1.eventBus, types.EventQueryUnlock)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1115,7 +1085,7 @@ func TestStateLockPOLSafety2(t *testing.T) {
// in round 2 we see the polkad block from round 0
newProp := types.NewProposal(height, round, 0, propBlockID0)
p := newProp.ToProto()
if err := vs3.SignProposal(context.Background(), config.ChainID(), p); err != nil {
if err := vs3.SignProposal(config.ChainID(), p); err != nil {
t.Fatal(err)
}
@@ -1147,8 +1117,6 @@ func TestStateLockPOLSafety2(t *testing.T) {
// What we want:
// P0 proposes B0 at R3.
func TestProposeValidBlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -1160,7 +1128,7 @@ func TestProposeValidBlock(t *testing.T) {
timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
unlockCh := subscribe(cs1.eventBus, types.EventQueryUnlock)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1239,8 +1207,6 @@ func TestProposeValidBlock(t *testing.T) {
// What we want:
// P0 miss to lock B but set valid block to B after receiving delayed prevote.
func TestSetValidBlockOnDelayedPrevote(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -1251,7 +1217,7 @@ func TestSetValidBlockOnDelayedPrevote(t *testing.T) {
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
validBlockCh := subscribe(cs1.eventBus, types.EventQueryValidBlock)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1303,8 +1269,6 @@ func TestSetValidBlockOnDelayedPrevote(t *testing.T) {
// P0 miss to lock B as Proposal Block is missing, but set valid block to B after
// receiving delayed Block Proposal.
func TestSetValidBlockOnDelayedProposal(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -1315,7 +1279,7 @@ func TestSetValidBlockOnDelayedProposal(t *testing.T) {
timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
validBlockCh := subscribe(cs1.eventBus, types.EventQueryValidBlock)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1361,8 +1325,6 @@ func TestSetValidBlockOnDelayedProposal(t *testing.T) {
// What we want:
// P0 waits for timeoutPrecommit before starting next round
func TestWaitingTimeoutOnNilPolka(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -1384,15 +1346,13 @@ func TestWaitingTimeoutOnNilPolka(t *testing.T) {
// What we want:
// P0 waits for timeoutPropose in the next round before entering prevote
func TestWaitingTimeoutProposeOnNewRound(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1422,15 +1382,13 @@ func TestWaitingTimeoutProposeOnNewRound(t *testing.T) {
// What we want:
// P0 jump to higher round, precommit and start precommit wait
func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1460,15 +1418,13 @@ func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) {
// What we want:
// P0 wait for timeoutPropose to expire before sending prevote.
func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, int32(1)
timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1489,8 +1445,6 @@ func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) {
// What we want:
// P0 emit NewValidBlock event upon receiving 2/3+ Precommit for B but hasn't received block B yet
func TestEmitNewValidBlockEventOnCommitWithoutBlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, int32(1)
@@ -1525,8 +1479,6 @@ func TestEmitNewValidBlockEventOnCommitWithoutBlock(t *testing.T) {
// P0 receives 2/3+ Precommit for B for round 0, while being in round 1. It emits NewValidBlock event.
// After receiving block, it executes block and moves to the next height.
func TestCommitFromPreviousRound(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, int32(1)
@@ -1580,8 +1532,6 @@ func (n *fakeTxNotifier) Notify() {
// and third precommit arrives which leads to the commit of that header and the correct
// start of the next round
func TestStartNextHeightCorrectlyAfterTimeout(t *testing.T) {
configSetup(t)
config.Consensus.SkipTimeoutCommit = false
cs1, vss := randState(4)
cs1.txNotifier = &fakeTxNotifier{ch: make(chan struct{})}
@@ -1595,7 +1545,7 @@ func TestStartNextHeightCorrectlyAfterTimeout(t *testing.T) {
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
newBlockHeader := subscribe(cs1.eventBus, types.EventQueryNewBlockHeader)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1643,8 +1593,6 @@ func TestStartNextHeightCorrectlyAfterTimeout(t *testing.T) {
}
func TestResetTimeoutPrecommitUponNewHeight(t *testing.T) {
configSetup(t)
config.Consensus.SkipTimeoutCommit = false
cs1, vss := randState(4)
@@ -1657,7 +1605,7 @@ func TestResetTimeoutPrecommitUponNewHeight(t *testing.T) {
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
newBlockHeader := subscribe(cs1.eventBus, types.EventQueryNewBlockHeader)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1787,8 +1735,6 @@ func TestStateSlashingPrecommits(t *testing.T) {
// 4 vals.
// we receive a final precommit after going into next round, but others might have gone to commit already!
func TestStateHalt1(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@@ -1798,7 +1744,7 @@ func TestStateHalt1(t *testing.T) {
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlock)
pv1, err := cs1.privValidator.GetPubKey(context.Background())
pv1, err := cs1.privValidator.GetPubKey()
require.NoError(t, err)
addr := pv1.Address()
voteCh := subscribeToVoter(cs1, addr)
@@ -1856,8 +1802,6 @@ func TestStateHalt1(t *testing.T) {
}
func TestStateOutputsBlockPartsStats(t *testing.T) {
configSetup(t)
// create dummy peer
cs, _ := randState(1)
peer := p2pmock.NewPeer(nil)
@@ -1901,8 +1845,6 @@ func TestStateOutputsBlockPartsStats(t *testing.T) {
}
func TestStateOutputVoteStats(t *testing.T) {
configSetup(t)
cs, vss := randState(2)
// create dummy peer
peer := p2pmock.NewPeer(nil)

View File

@@ -1,7 +1,6 @@
package types
import (
"context"
"fmt"
"os"
"testing"
@@ -58,7 +57,7 @@ func TestPeerCatchupRounds(t *testing.T) {
func makeVoteHR(t *testing.T, height int64, valIndex, round int32, privVals []types.PrivValidator) *types.Vote {
privVal := privVals[valIndex]
pubKey, err := privVal.GetPubKey(context.Background())
pubKey, err := privVal.GetPubKey()
if err != nil {
panic(err)
}
@@ -77,7 +76,7 @@ func makeVoteHR(t *testing.T, height int64, valIndex, round int32, privVals []ty
chainID := config.ChainID()
v := vote.ToProto()
err = privVal.SignVote(context.Background(), chainID, v)
err = privVal.SignVote(chainID, v)
if err != nil {
panic(fmt.Sprintf("Error signing vote: %v", err))
}

View File

@@ -516,27 +516,27 @@ func createConsensusReactor(
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
// var (
// channels map[p2p.ChannelID]*p2p.Channel
// peerUpdates *p2p.PeerUpdates
// )
if useLegacyP2P {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, cs.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
// if useLegacyP2P {
// channels = getChannelsFromShim(reactorShim)
// peerUpdates = reactorShim.PeerUpdates
// } else {
// channels = makeChannelsFromShims(router, cs.ChannelShims)
// peerUpdates = peerManager.Subscribe()
// }
reactor := cs.NewReactor(
logger,
// logger,
consensusState,
channels[cs.StateChannel],
channels[cs.DataChannel],
channels[cs.VoteChannel],
channels[cs.VoteSetBitsChannel],
peerUpdates,
// channels[cs.StateChannel],
// channels[cs.DataChannel],
// channels[cs.VoteChannel],
// channels[cs.VoteSetBitsChannel],
// peerUpdates,
waitSync,
cs.ReactorMetrics(csMetrics),
)

View File

@@ -1,80 +0,0 @@
package consensus
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
)
// Wrap implements the p2p Wrapper interface and wraps a consensus proto message.
func (m *Message) Wrap(pb proto.Message) error {
switch msg := pb.(type) {
case *NewRoundStep:
m.Sum = &Message_NewRoundStep{NewRoundStep: msg}
case *NewValidBlock:
m.Sum = &Message_NewValidBlock{NewValidBlock: msg}
case *Proposal:
m.Sum = &Message_Proposal{Proposal: msg}
case *ProposalPOL:
m.Sum = &Message_ProposalPol{ProposalPol: msg}
case *BlockPart:
m.Sum = &Message_BlockPart{BlockPart: msg}
case *Vote:
m.Sum = &Message_Vote{Vote: msg}
case *HasVote:
m.Sum = &Message_HasVote{HasVote: msg}
case *VoteSetMaj23:
m.Sum = &Message_VoteSetMaj23{VoteSetMaj23: msg}
case *VoteSetBits:
m.Sum = &Message_VoteSetBits{VoteSetBits: msg}
default:
return fmt.Errorf("unknown message: %T", msg)
}
return nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped consensus
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_NewRoundStep:
return m.GetNewRoundStep(), nil
case *Message_NewValidBlock:
return m.GetNewValidBlock(), nil
case *Message_Proposal:
return m.GetProposal(), nil
case *Message_ProposalPol:
return m.GetProposalPol(), nil
case *Message_BlockPart:
return m.GetBlockPart(), nil
case *Message_Vote:
return m.GetVote(), nil
case *Message_HasVote:
return m.GetHasVote(), nil
case *Message_VoteSetMaj23:
return m.GetVoteSetMaj23(), nil
case *Message_VoteSetBits:
return m.GetVoteSetBits(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -7,7 +7,7 @@ import (
proto "github.com/gogo/protobuf/proto"
)
// Wrap implements the p2p Wrapper interface and wraps a state sync proto message.
// Wrap implements the p2p Wrapper interface and wraps a state sync messages.
func (m *Message) Wrap(pb proto.Message) error {
switch msg := pb.(type) {
case *ChunkRequest:
@@ -30,7 +30,7 @@ func (m *Message) Wrap(pb proto.Message) error {
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped state sync
// proto message.
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_ChunkRequest:

View File

@@ -68,8 +68,8 @@ const (
chunkMsgSize = int(16e6)
)
// Reactor handles state sync, both restoring snapshots for the local node and
// serving snapshots for other nodes.
// Reactor handles state sync, both restoring snapshots for the local node and serving snapshots
// for other nodes.
type Reactor struct {
service.BaseService