mirror of
https://github.com/tendermint/tendermint.git
synced 2025-12-23 06:15:19 +00:00
* p2p: add a per-message type send and receive metric (#9622) * p2p: ressurrect the p2p envelope and use to calculate message metric Add new SendEnvelope, TrySendEnvelope, BroadcastEnvelope, and ReceiveEnvelope methods in the p2p package to work with the new envelope type. Care was taken to ensure this was performed in a non-breaking manner. Co-authored-by: William Banfield <4561443+williambanfield@users.noreply.github.com> Co-authored-by: William Banfield <wbanfield@gmail.com>
601 lines
19 KiB
Go
601 lines
19 KiB
Go
package consensus
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
dbm "github.com/tendermint/tm-db"
|
|
|
|
abcicli "github.com/tendermint/tendermint/abci/client"
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/evidence"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/libs/service"
|
|
tmsync "github.com/tendermint/tendermint/libs/sync"
|
|
mempl "github.com/tendermint/tendermint/mempool"
|
|
|
|
cfg "github.com/tendermint/tendermint/config"
|
|
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
|
|
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
|
|
"github.com/tendermint/tendermint/p2p"
|
|
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
|
|
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
|
sm "github.com/tendermint/tendermint/state"
|
|
"github.com/tendermint/tendermint/store"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
//----------------------------------------------
|
|
// byzantine failures
|
|
|
|
// Byzantine node sends two different prevotes (nil and blockID) to the same validator
|
|
func TestByzantinePrevoteEquivocation(t *testing.T) {
|
|
const nValidators = 4
|
|
const byzantineNode = 0
|
|
const prevoteHeight = int64(2)
|
|
testName := "consensus_byzantine_test"
|
|
tickerFunc := newMockTickerFunc(true)
|
|
appFunc := newCounter
|
|
|
|
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
|
|
css := make([]*State, nValidators)
|
|
|
|
for i := 0; i < nValidators; i++ {
|
|
logger := consensusLogger().With("test", "byzantine", "validator", i)
|
|
stateDB := dbm.NewMemDB() // each state needs its own db
|
|
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
|
|
DiscardABCIResponses: false,
|
|
})
|
|
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
|
|
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
|
|
defer os.RemoveAll(thisConfig.RootDir)
|
|
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
|
|
app := appFunc()
|
|
vals := types.TM2PB.ValidatorUpdates(state.Validators)
|
|
app.InitChain(abci.RequestInitChain{Validators: vals})
|
|
|
|
blockDB := dbm.NewMemDB()
|
|
blockStore := store.NewBlockStore(blockDB)
|
|
|
|
mtx := new(tmsync.Mutex)
|
|
// one for mempool, one for consensus
|
|
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
|
|
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)
|
|
|
|
// Make Mempool
|
|
var mempool mempl.Mempool
|
|
|
|
switch thisConfig.Mempool.Version {
|
|
case cfg.MempoolV0:
|
|
mempool = mempoolv0.NewCListMempool(config.Mempool,
|
|
proxyAppConnConMem,
|
|
state.LastBlockHeight,
|
|
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
|
|
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
|
|
case cfg.MempoolV1:
|
|
mempool = mempoolv1.NewTxMempool(logger,
|
|
config.Mempool,
|
|
proxyAppConnConMem,
|
|
state.LastBlockHeight,
|
|
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
|
|
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
|
|
)
|
|
}
|
|
|
|
if thisConfig.Consensus.WaitForTxs() {
|
|
mempool.EnableTxsAvailable()
|
|
}
|
|
|
|
// Make a full instance of the evidence pool
|
|
evidenceDB := dbm.NewMemDB()
|
|
evpool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
|
|
require.NoError(t, err)
|
|
evpool.SetLogger(logger.With("module", "evidence"))
|
|
|
|
// Make State
|
|
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
|
|
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
|
|
cs.SetLogger(cs.Logger)
|
|
// set private validator
|
|
pv := privVals[i]
|
|
cs.SetPrivValidator(pv)
|
|
|
|
eventBus := types.NewEventBus()
|
|
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
|
|
err = eventBus.Start()
|
|
require.NoError(t, err)
|
|
cs.SetEventBus(eventBus)
|
|
|
|
cs.SetTimeoutTicker(tickerFunc())
|
|
cs.SetLogger(logger)
|
|
|
|
css[i] = cs
|
|
}
|
|
|
|
// initialize the reactors for each of the validators
|
|
reactors := make([]*Reactor, nValidators)
|
|
blocksSubs := make([]types.Subscription, 0)
|
|
eventBuses := make([]*types.EventBus, nValidators)
|
|
for i := 0; i < nValidators; i++ {
|
|
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
|
|
reactors[i].SetLogger(css[i].Logger)
|
|
|
|
// eventBus is already started with the cs
|
|
eventBuses[i] = css[i].eventBus
|
|
reactors[i].SetEventBus(eventBuses[i])
|
|
|
|
blocksSub, err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, 100)
|
|
require.NoError(t, err)
|
|
blocksSubs = append(blocksSubs, blocksSub)
|
|
|
|
if css[i].state.LastBlockHeight == 0 { // simulate handle initChain in handshake
|
|
err = css[i].blockExec.Store().Save(css[i].state)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
// make connected switches and start all reactors
|
|
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch) *p2p.Switch {
|
|
s.AddReactor("CONSENSUS", reactors[i])
|
|
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
|
|
return s
|
|
}, p2p.Connect2Switches)
|
|
|
|
// create byzantine validator
|
|
bcs := css[byzantineNode]
|
|
|
|
// alter prevote so that the byzantine node double votes when height is 2
|
|
bcs.doPrevote = func(height int64, round int32) {
|
|
// allow first height to happen normally so that byzantine validator is no longer proposer
|
|
if height == prevoteHeight {
|
|
bcs.Logger.Info("Sending two votes")
|
|
prevote1, err := bcs.signVote(tmproto.PrevoteType, bcs.ProposalBlock.Hash(), bcs.ProposalBlockParts.Header())
|
|
require.NoError(t, err)
|
|
prevote2, err := bcs.signVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
|
|
require.NoError(t, err)
|
|
peerList := reactors[byzantineNode].Switch.Peers().List()
|
|
bcs.Logger.Info("Getting peer list", "peers", peerList)
|
|
// send two votes to all peers (1st to one half, 2nd to another half)
|
|
for i, peer := range peerList {
|
|
if i < len(peerList)/2 {
|
|
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
|
|
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
|
|
Message: &tmcons.Vote{Vote: prevote1.ToProto()},
|
|
ChannelID: VoteChannel,
|
|
}, bcs.Logger)
|
|
} else {
|
|
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
|
|
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
|
|
Message: &tmcons.Vote{Vote: prevote2.ToProto()},
|
|
ChannelID: VoteChannel,
|
|
}, bcs.Logger)
|
|
}
|
|
}
|
|
} else {
|
|
bcs.Logger.Info("Behaving normally")
|
|
bcs.defaultDoPrevote(height, round)
|
|
}
|
|
}
|
|
|
|
// introducing a lazy proposer means that the time of the block committed is different to the
|
|
// timestamp that the other nodes have. This tests to ensure that the evidence that finally gets
|
|
// proposed will have a valid timestamp
|
|
lazyProposer := css[1]
|
|
|
|
lazyProposer.decideProposal = func(height int64, round int32) {
|
|
lazyProposer.Logger.Info("Lazy Proposer proposing condensed commit")
|
|
if lazyProposer.privValidator == nil {
|
|
panic("entered createProposalBlock with privValidator being nil")
|
|
}
|
|
|
|
var commit *types.Commit
|
|
switch {
|
|
case lazyProposer.Height == lazyProposer.state.InitialHeight:
|
|
// We're creating a proposal for the first block.
|
|
// The commit is empty, but not nil.
|
|
commit = types.NewCommit(0, 0, types.BlockID{}, nil)
|
|
case lazyProposer.LastCommit.HasTwoThirdsMajority():
|
|
// Make the commit from LastCommit
|
|
commit = lazyProposer.LastCommit.MakeCommit()
|
|
default: // This shouldn't happen.
|
|
lazyProposer.Logger.Error("enterPropose: Cannot propose anything: No commit for the previous block")
|
|
return
|
|
}
|
|
|
|
// omit the last signature in the commit
|
|
commit.Signatures[len(commit.Signatures)-1] = types.NewCommitSigAbsent()
|
|
|
|
if lazyProposer.privValidatorPubKey == nil {
|
|
// If this node is a validator & proposer in the current round, it will
|
|
// miss the opportunity to create a block.
|
|
lazyProposer.Logger.Error(fmt.Sprintf("enterPropose: %v", errPubKeyIsNotSet))
|
|
return
|
|
}
|
|
proposerAddr := lazyProposer.privValidatorPubKey.Address()
|
|
|
|
block, blockParts := lazyProposer.blockExec.CreateProposalBlock(
|
|
lazyProposer.Height, lazyProposer.state, commit, proposerAddr,
|
|
)
|
|
|
|
// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
|
|
// and the privValidator will refuse to sign anything.
|
|
if err := lazyProposer.wal.FlushAndSync(); err != nil {
|
|
lazyProposer.Logger.Error("Error flushing to disk")
|
|
}
|
|
|
|
// Make proposal
|
|
propBlockID := types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()}
|
|
proposal := types.NewProposal(height, round, lazyProposer.ValidRound, propBlockID)
|
|
p := proposal.ToProto()
|
|
if err := lazyProposer.privValidator.SignProposal(lazyProposer.state.ChainID, p); err == nil {
|
|
proposal.Signature = p.Signature
|
|
|
|
// send proposal and block parts on internal msg queue
|
|
lazyProposer.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
|
|
for i := 0; i < int(blockParts.Total()); i++ {
|
|
part := blockParts.GetPart(i)
|
|
lazyProposer.sendInternalMessage(msgInfo{&BlockPartMessage{lazyProposer.Height, lazyProposer.Round, part}, ""})
|
|
}
|
|
lazyProposer.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
|
|
lazyProposer.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block))
|
|
} else if !lazyProposer.replayMode {
|
|
lazyProposer.Logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err)
|
|
}
|
|
}
|
|
|
|
// start the consensus reactors
|
|
for i := 0; i < nValidators; i++ {
|
|
s := reactors[i].conS.GetState()
|
|
reactors[i].SwitchToConsensus(s, false)
|
|
}
|
|
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
|
|
|
// Evidence should be submitted and committed at the third height but
|
|
// we will check the first six just in case
|
|
evidenceFromEachValidator := make([]types.Evidence, nValidators)
|
|
|
|
wg := new(sync.WaitGroup)
|
|
for i := 0; i < nValidators; i++ {
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
for msg := range blocksSubs[i].Out() {
|
|
block := msg.Data().(types.EventDataNewBlock).Block
|
|
if len(block.Evidence.Evidence) != 0 {
|
|
evidenceFromEachValidator[i] = block.Evidence.Evidence[0]
|
|
return
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
pubkey, err := bcs.privValidator.GetPubKey()
|
|
require.NoError(t, err)
|
|
|
|
select {
|
|
case <-done:
|
|
for idx, ev := range evidenceFromEachValidator {
|
|
if assert.NotNil(t, ev, idx) {
|
|
ev, ok := ev.(*types.DuplicateVoteEvidence)
|
|
assert.True(t, ok)
|
|
assert.Equal(t, pubkey.Address(), ev.VoteA.ValidatorAddress)
|
|
assert.Equal(t, prevoteHeight, ev.Height())
|
|
}
|
|
}
|
|
case <-time.After(20 * time.Second):
|
|
for i, reactor := range reactors {
|
|
t.Logf("Consensus Reactor %d\n%v", i, reactor)
|
|
}
|
|
t.Fatalf("Timed out waiting for validators to commit evidence")
|
|
}
|
|
}
|
|
|
|
// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
|
|
// byzantine validator sends conflicting proposals into A and B,
|
|
// and prevotes/precommits on both of them.
|
|
// B sees a commit, A doesn't.
|
|
// Heal partition and ensure A sees the commit
|
|
func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
|
|
N := 4
|
|
logger := consensusLogger().With("test", "byzantine")
|
|
app := newCounter
|
|
css, cleanup := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), app)
|
|
defer cleanup()
|
|
|
|
// give the byzantine validator a normal ticker
|
|
ticker := NewTimeoutTicker()
|
|
ticker.SetLogger(css[0].Logger)
|
|
css[0].SetTimeoutTicker(ticker)
|
|
|
|
switches := make([]*p2p.Switch, N)
|
|
p2pLogger := logger.With("module", "p2p")
|
|
for i := 0; i < N; i++ {
|
|
switches[i] = p2p.MakeSwitch(
|
|
config.P2P,
|
|
i,
|
|
"foo", "1.0.0",
|
|
func(i int, sw *p2p.Switch) *p2p.Switch {
|
|
return sw
|
|
})
|
|
switches[i].SetLogger(p2pLogger.With("validator", i))
|
|
}
|
|
|
|
blocksSubs := make([]types.Subscription, N)
|
|
reactors := make([]p2p.Reactor, N)
|
|
for i := 0; i < N; i++ {
|
|
|
|
// enable txs so we can create different proposals
|
|
assertMempool(css[i].txNotifier).EnableTxsAvailable()
|
|
// make first val byzantine
|
|
if i == 0 {
|
|
// NOTE: Now, test validators are MockPV, which by default doesn't
|
|
// do any safety checks.
|
|
css[i].privValidator.(types.MockPV).DisableChecks()
|
|
css[i].decideProposal = func(j int32) func(int64, int32) {
|
|
return func(height int64, round int32) {
|
|
byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
|
|
}
|
|
}(int32(i))
|
|
// We are setting the prevote function to do nothing because the prevoting
|
|
// and precommitting are done alongside the proposal.
|
|
css[i].doPrevote = func(height int64, round int32) {}
|
|
}
|
|
|
|
eventBus := css[i].eventBus
|
|
eventBus.SetLogger(logger.With("module", "events", "validator", i))
|
|
|
|
var err error
|
|
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
|
|
require.NoError(t, err)
|
|
|
|
conR := NewReactor(css[i], true) // so we don't start the consensus states
|
|
conR.SetLogger(logger.With("validator", i))
|
|
conR.SetEventBus(eventBus)
|
|
|
|
var conRI p2p.Reactor = conR
|
|
|
|
// make first val byzantine
|
|
if i == 0 {
|
|
conRI = NewByzantineReactor(conR)
|
|
}
|
|
|
|
reactors[i] = conRI
|
|
err = css[i].blockExec.Store().Save(css[i].state) // for save height 1's validators info
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
defer func() {
|
|
for _, r := range reactors {
|
|
if rr, ok := r.(*ByzantineReactor); ok {
|
|
err := rr.reactor.Switch.Stop()
|
|
require.NoError(t, err)
|
|
} else {
|
|
err := r.(*Reactor).Switch.Stop()
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
|
|
// ignore new switch s, we already made ours
|
|
switches[i].AddReactor("CONSENSUS", reactors[i])
|
|
return switches[i]
|
|
}, func(sws []*p2p.Switch, i, j int) {
|
|
// the network starts partitioned with globally active adversary
|
|
if i != 0 {
|
|
return
|
|
}
|
|
p2p.Connect2Switches(sws, i, j)
|
|
})
|
|
|
|
// start the non-byz state machines.
|
|
// note these must be started before the byz
|
|
for i := 1; i < N; i++ {
|
|
cr := reactors[i].(*Reactor)
|
|
cr.SwitchToConsensus(cr.conS.GetState(), false)
|
|
}
|
|
|
|
// start the byzantine state machine
|
|
byzR := reactors[0].(*ByzantineReactor)
|
|
s := byzR.reactor.conS.GetState()
|
|
byzR.reactor.SwitchToConsensus(s, false)
|
|
|
|
// byz proposer sends one block to peers[0]
|
|
// and the other block to peers[1] and peers[2].
|
|
// note peers and switches order don't match.
|
|
peers := switches[0].Peers().List()
|
|
|
|
// partition A
|
|
ind0 := getSwitchIndex(switches, peers[0])
|
|
|
|
// partition B
|
|
ind1 := getSwitchIndex(switches, peers[1])
|
|
ind2 := getSwitchIndex(switches, peers[2])
|
|
p2p.Connect2Switches(switches, ind1, ind2)
|
|
|
|
// wait for someone in the big partition (B) to make a block
|
|
<-blocksSubs[ind2].Out()
|
|
|
|
t.Logf("A block has been committed. Healing partition")
|
|
p2p.Connect2Switches(switches, ind0, ind1)
|
|
p2p.Connect2Switches(switches, ind0, ind2)
|
|
|
|
// wait till everyone makes the first new block
|
|
// (one of them already has)
|
|
wg := new(sync.WaitGroup)
|
|
for i := 1; i < N-1; i++ {
|
|
wg.Add(1)
|
|
go func(j int) {
|
|
<-blocksSubs[j].Out()
|
|
wg.Done()
|
|
}(i)
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
tick := time.NewTicker(time.Second * 10)
|
|
select {
|
|
case <-done:
|
|
case <-tick.C:
|
|
for i, reactor := range reactors {
|
|
t.Logf(fmt.Sprintf("Consensus Reactor %v", i))
|
|
t.Logf(fmt.Sprintf("%v", reactor))
|
|
}
|
|
t.Fatalf("Timed out waiting for all validators to commit first block")
|
|
}
|
|
}
|
|
|
|
//-------------------------------
|
|
// byzantine consensus functions
|
|
|
|
func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) {
|
|
// byzantine user should create two proposals and try to split the vote.
|
|
// Avoid sending on internalMsgQueue and running consensus state.
|
|
|
|
// Create a new proposal block from state/txs from the mempool.
|
|
block1, blockParts1 := cs.createProposalBlock()
|
|
polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
|
|
proposal1 := types.NewProposal(height, round, polRound, propBlockID)
|
|
p1 := proposal1.ToProto()
|
|
if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
proposal1.Signature = p1.Signature
|
|
|
|
// some new transactions come in (this ensures that the proposals are different)
|
|
deliverTxsRange(cs, 0, 1)
|
|
|
|
// Create a new proposal block from state/txs from the mempool.
|
|
block2, blockParts2 := cs.createProposalBlock()
|
|
polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
|
|
proposal2 := types.NewProposal(height, round, polRound, propBlockID)
|
|
p2 := proposal2.ToProto()
|
|
if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
proposal2.Signature = p2.Signature
|
|
|
|
block1Hash := block1.Hash()
|
|
block2Hash := block2.Hash()
|
|
|
|
// broadcast conflicting proposals/block parts to peers
|
|
peers := sw.Peers().List()
|
|
t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
|
|
for i, peer := range peers {
|
|
if i < len(peers)/2 {
|
|
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
|
|
} else {
|
|
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
|
|
}
|
|
}
|
|
}
|
|
|
|
func sendProposalAndParts(
|
|
height int64,
|
|
round int32,
|
|
cs *State,
|
|
peer p2p.Peer,
|
|
proposal *types.Proposal,
|
|
blockHash []byte,
|
|
parts *types.PartSet,
|
|
) {
|
|
// proposal
|
|
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
|
|
ChannelID: DataChannel,
|
|
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
|
|
}, cs.Logger)
|
|
|
|
// parts
|
|
for i := 0; i < int(parts.Total()); i++ {
|
|
part := parts.GetPart(i)
|
|
pp, err := part.ToProto()
|
|
if err != nil {
|
|
panic(err) // TODO: wbanfield better error handling
|
|
}
|
|
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
|
|
ChannelID: DataChannel,
|
|
Message: &tmcons.BlockPart{
|
|
Height: height, // This tells peer that this part applies to us.
|
|
Round: round, // This tells peer that this part applies to us.
|
|
Part: *pp,
|
|
},
|
|
}, cs.Logger)
|
|
}
|
|
|
|
// votes
|
|
cs.mtx.Lock()
|
|
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
|
|
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
|
|
cs.mtx.Unlock()
|
|
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
|
|
ChannelID: VoteChannel,
|
|
Message: &tmcons.Vote{Vote: prevote.ToProto()},
|
|
}, cs.Logger)
|
|
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
|
|
ChannelID: VoteChannel,
|
|
Message: &tmcons.Vote{Vote: precommit.ToProto()},
|
|
}, cs.Logger)
|
|
}
|
|
|
|
//----------------------------------------
|
|
// byzantine consensus reactor
|
|
|
|
type ByzantineReactor struct {
|
|
service.Service
|
|
reactor *Reactor
|
|
}
|
|
|
|
func NewByzantineReactor(conR *Reactor) *ByzantineReactor {
|
|
return &ByzantineReactor{
|
|
Service: conR,
|
|
reactor: conR,
|
|
}
|
|
}
|
|
|
|
func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
|
|
func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
|
|
func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
|
|
if !br.reactor.IsRunning() {
|
|
return
|
|
}
|
|
|
|
// Create peerState for peer
|
|
peerState := NewPeerState(peer).SetLogger(br.reactor.Logger)
|
|
peer.Set(types.PeerStateKey, peerState)
|
|
|
|
// Send our state to peer.
|
|
// If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
|
|
if !br.reactor.waitSync {
|
|
br.reactor.sendNewRoundStepMessage(peer)
|
|
}
|
|
}
|
|
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
|
br.reactor.RemovePeer(peer, reason)
|
|
}
|
|
func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) {
|
|
br.reactor.ReceiveEnvelope(e)
|
|
}
|
|
func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) {
|
|
br.reactor.Receive(chID, p, m)
|
|
}
|
|
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
|