Compare commits

...

10 Commits

Author SHA1 Message Date
Callum Waters
7018c73baf fix up a few things 2022-11-09 15:17:43 +01:00
Callum Waters
4021df503b lint 2022-11-09 13:51:33 +01:00
Callum Waters
11976a4863 fix lints 2022-11-01 14:18:13 +01:00
Callum Waters
296ec7e113 merge 2022-11-01 11:58:53 +01:00
Callum Waters
933256c862 node start up phases 2022-11-01 11:26:46 +01:00
Callum Waters
a2610a9998 lint 2022-10-26 17:07:23 +02:00
Callum Waters
da8810f480 fix gosec compaints 2022-10-26 17:01:11 +02:00
Callum Waters
10dca45e8c Merge branch 'main' of github.com:tendermint/tendermint 2022-10-26 16:45:32 +02:00
Callum Waters
67bc51ad72 split out node into a setup file 2022-10-26 16:45:27 +02:00
Callum Waters
3faf580a0d remove unused id file 2022-10-26 16:44:45 +02:00
35 changed files with 667 additions and 542 deletions

30
blocksync/metrics.gen.go Normal file
View File

@@ -0,0 +1,30 @@
// Code generated by metricsgen. DO NOT EDIT.
package blocksync
import (
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "syncing",
Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
}
}
func NopMetrics() *Metrics {
return &Metrics{
Syncing: discard.NewGauge(),
}
}

19
blocksync/metrics.go Normal file
View File

@@ -0,0 +1,19 @@
package blocksync
import (
"github.com/go-kit/kit/metrics"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "blocksync"
)
//go:generate go run ../scripts/metricsgen -struct=Metrics
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Whether or not a node is block syncing. 1 if yes, 0 if no.
Syncing metrics.Gauge
}

View File

@@ -99,6 +99,9 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
// OnStart implements service.Service by spawning requesters routine and recording
// pool's start time.
func (pool *BlockPool) OnStart() error {
if pool.height == 0 {
return errors.New("height not set")
}
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
@@ -111,22 +114,19 @@ func (pool *BlockPool) makeRequestersRoutine() {
break
}
_, numPending, lenRequesters := pool.GetStatus()
switch {
case numPending >= maxPendingRequests:
height, maxPeerHeight, numPending, lenRequesters := pool.GetStatus()
if height >= maxPeerHeight ||
numPending >= maxPendingRequests ||
lenRequesters >= maxTotalRequesters {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
case lenRequesters >= maxTotalRequesters:
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
default:
// request for more blocks.
pool.makeNextRequester()
continue
}
// request for more blocks.
pool.makeNextRequester()
}
}
@@ -156,11 +156,11 @@ func (pool *BlockPool) removeTimedoutPeers() {
// GetStatus returns pool's height, numPending requests and the number of
// requesters.
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
func (pool *BlockPool) GetStatus() (height, maxPeerHeight int64, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
return pool.height, pool.maxPeerHeight, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}
// IsCaughtUp returns true if this node is caught up, false - otherwise.
@@ -302,6 +302,7 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
}
if height > pool.maxPeerHeight {
pool.Logger.Info("new max peer height", "height", height)
pool.maxPeerHeight = height
}
}
@@ -388,7 +389,7 @@ func (pool *BlockPool) makeNextRequester() {
err := request.Start()
if err != nil {
request.Logger.Error("Error starting request", "err", err)
pool.Logger.Error("Error starting request", "err", err)
}
}

View File

@@ -32,7 +32,7 @@ const (
type consensusReactor interface {
// for when we switch from blocksync reactor and block sync to
// the consensus machine
SwitchToConsensus(state sm.State, skipWAL bool)
SwitchToConsensus(state sm.State, skipWAL bool) error
}
type peerError struct {
@@ -54,16 +54,15 @@ type Reactor struct {
blockExec *sm.BlockExecutor
store *store.BlockStore
pool *BlockPool
blockSync bool
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
metrics *Metrics
}
// NewReactor returns new reactor instance.
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
blockSync bool) *Reactor {
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, metrics *Metrics) *Reactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
@@ -85,9 +84,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
blockExec: blockExec,
store: store,
pool: pool,
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
metrics: metrics,
}
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
return bcR
@@ -101,22 +100,22 @@ func (bcR *Reactor) SetLogger(l log.Logger) {
// OnStart implements service.Service.
func (bcR *Reactor) OnStart() error {
if bcR.blockSync {
err := bcR.pool.Start()
if err != nil {
return err
}
go bcR.poolRoutine(false)
}
return nil
}
// IsSyncing returns whether the node is using blocksync to advance heights
func (bcR *Reactor) IsSyncing() bool {
return bcR.pool.IsRunning()
}
// SwitchToBlockSync is called by the state sync reactor when switching to block sync.
func (bcR *Reactor) SwitchToBlockSync(state sm.State) error {
bcR.blockSync = true
bcR.initialState = state
bcR.pool.height = state.LastBlockHeight + 1
if state.LastBlockHeight == 0 {
bcR.pool.height = state.InitialHeight
} else {
bcR.pool.height = state.LastBlockHeight + 1
}
err := bcR.pool.Start()
if err != nil {
return err
@@ -127,7 +126,7 @@ func (bcR *Reactor) SwitchToBlockSync(state sm.State) error {
// OnStop implements service.Service.
func (bcR *Reactor) OnStop() {
if bcR.blockSync {
if bcR.pool.IsRunning() {
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
@@ -236,6 +235,8 @@ func (bcR *Reactor) Receive(e p2p.Envelope) {
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *Reactor) poolRoutine(stateSynced bool) {
bcR.metrics.Syncing.Set(1)
defer bcR.metrics.Syncing.Set(0)
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
defer trySyncTicker.Stop()
@@ -293,24 +294,28 @@ FOR_LOOP:
for {
select {
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
height, peerHeight, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
"outbound", outbound, "inbound", inbound, "peerHeight", peerHeight)
if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
// TODO: node struct should be responsible for switching from block sync to
// consensus. It's messy to have to grab the consensus reactor from the switch
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
err := conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
bcR.Logger.Error("failed to switch to consensus", "err", err)
}
// else {
// should only happen during testing
// }
break FOR_LOOP
return
}
case <-trySyncTicker.C: // chan time

View File

@@ -95,14 +95,6 @@ func newReactor(
mock.Anything,
mock.Anything).Return(nil)
// Make the Reactor itself.
// NOTE we have to create and commit the blocks first because
// pool.height is determined from the store.
fastSync := true
db := dbm.NewMemDB()
stateStore = sm.NewStore(db, sm.StoreOptions{
DiscardABCIResponses: false,
})
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
mp, sm.EmptyEvidencePool{}, blockStore)
if err = stateStore.Save(state); err != nil {
@@ -145,7 +137,7 @@ func newReactor(
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, NopMetrics())
bcReactor.SetLogger(logger.With("module", "blocksync"))
return ReactorPair{bcReactor, proxyApp}
@@ -156,6 +148,9 @@ func TestNoBlockResponse(t *testing.T) {
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
maxBlockHeight := int64(65)
reactorPairs := make([]ReactorPair, 2)
@@ -169,6 +164,12 @@ func TestNoBlockResponse(t *testing.T) {
}, p2p.Connect2Switches)
for _, reactor := range reactorPairs {
// turn on the syncing algorithm
err := reactor.reactor.SwitchToBlockSync(state)
require.NoError(t, err)
}
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
@@ -218,6 +219,9 @@ func TestBadBlockStopsPeer(t *testing.T) {
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
maxBlockHeight := int64(148)
// Other chain needs a different validator set
@@ -244,6 +248,12 @@ func TestBadBlockStopsPeer(t *testing.T) {
}, p2p.Connect2Switches)
for _, reactor := range reactorPairs {
// turn on the syncing algorithm
err := reactor.reactor.SwitchToBlockSync(state)
require.NoError(t, err)
}
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
@@ -287,6 +297,11 @@ func TestBadBlockStopsPeer(t *testing.T) {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
otherState, err := sm.MakeGenesisState(otherGenDoc)
require.NoError(t, err)
err = lastReactorPair.reactor.SwitchToBlockSync(otherState)
require.NoError(t, err)
for {
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break

View File

@@ -46,6 +46,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
appFunc := newKVStore
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
css := make([]*State, nValidators)
for i := 0; i < nValidators; i++ {
@@ -54,7 +56,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: false,
})
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
defer os.RemoveAll(thisConfig.RootDir)
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
@@ -102,7 +103,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy()))
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
@@ -125,7 +126,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blocksSubs := make([]types.Subscription, 0)
eventBuses := make([]*types.EventBus, nValidators)
for i := 0; i < nValidators; i++ {
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
// Note, we dont start the consensus states
reactors[i] = NewReactor(css[i])
reactors[i].SetLogger(css[i].Logger)
// eventBus is already started with the cs
@@ -254,8 +256,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// start the consensus reactors
for i := 0; i < nValidators; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s, false)
require.NoError(t, reactors[i].SwitchToConsensus(state.Copy(), false))
}
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -314,7 +315,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
N := 4
logger := consensusLogger().With("test", "byzantine")
app := newKVStore
css, cleanup := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), app)
css, cleanup := randConsensusNet(t, N, "consensus_byzantine_test", newMockTickerFunc(false), app)
defer cleanup()
// give the byzantine validator a normal ticker
@@ -363,7 +364,8 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
conR := NewReactor(css[i], true) // so we don't start the consensus states
// Note, we don't start the consensus states
conR := NewReactor(css[i])
conR.SetLogger(logger.With("validator", i))
conR.SetEventBus(eventBus)
@@ -407,13 +409,13 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// note these must be started before the byz
for i := 1; i < N; i++ {
cr := reactors[i].(*Reactor)
cr.SwitchToConsensus(cr.conS.GetState(), false)
require.NoError(t, cr.SwitchToConsensus(cr.conS.GetState(), false))
}
// start the byzantine state machine
byzR := reactors[0].(*ByzantineReactor)
s := byzR.reactor.conS.GetState()
byzR.reactor.SwitchToConsensus(s, false)
require.NoError(t, byzR.reactor.SwitchToConsensus(s, false))
// byz proposer sends one block to peers[0]
// and the other block to peers[1] and peers[2].
@@ -592,7 +594,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
// Send our state to peer.
// If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.waitSync {
if br.reactor.conS.IsRunning() {
br.reactor.sendNewRoundStepMessage(peer)
}
}

View File

@@ -440,7 +440,7 @@ func newStateWithConfigAndBlockStore(
}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy()))
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)
@@ -747,18 +747,17 @@ func consensusLogger() log.Logger {
}).With("module", "consensus")
}
func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker,
func randConsensusNet(t *testing.T, nValidators int, testName string, tickerFunc func() TimeoutTicker,
appFunc func() abci.Application, configOpts ...func(*cfg.Config)) ([]*State, cleanupFunc) {
t.Helper()
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
css := make([]*State, nValidators)
logger := consensusLogger()
configRootDirs := make([]string, 0, nValidators)
for i := 0; i < nValidators; i++ {
stateDB := dbm.NewMemDB() // each state needs its own db
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: false,
})
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
configRootDirs = append(configRootDirs, thisConfig.RootDir)
for _, opt := range configOpts {
@@ -772,6 +771,7 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
css[i].updateToState(state.Copy())
}
return css, func() {
for _, dir := range configRootDirs {

View File

@@ -19,7 +19,7 @@ import (
// Ensure a testnet makes blocks
func TestReactorInvalidPrecommit(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
for i := 0; i < 4; i++ {

View File

@@ -118,18 +118,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "latest_block_height",
Help: "The latest block height.",
}, labels).With(labelsAndValues...),
BlockSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_syncing",
Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "state_syncing",
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
@@ -208,8 +196,6 @@ func NopMetrics() *Metrics {
BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
CommittedHeight: discard.NewGauge(),
BlockSyncing: discard.NewGauge(),
StateSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),
StepDurationSeconds: discard.NewHistogram(),
BlockGossipPartsReceived: discard.NewCounter(),

View File

@@ -61,10 +61,6 @@ type Metrics struct {
TotalTxs metrics.Gauge
// The latest block height.
CommittedHeight metrics.Gauge `metrics_name:"latest_block_height"`
// Whether or not a node is block syncing. 1 if yes, 0 if no.
BlockSyncing metrics.Gauge
// Whether or not a node is state syncing. 1 if yes, 0 if no.
StateSyncing metrics.Gauge
// Number of block parts transmitted by each peer.
BlockParts metrics.Counter `metrics_labels:"peer_id"`

View File

@@ -42,7 +42,6 @@ type Reactor struct {
conS *State
mtx tmsync.RWMutex
waitSync bool
eventBus *types.EventBus
rs *cstypes.RoundState
@@ -53,12 +52,11 @@ type ReactorOption func(*Reactor)
// NewReactor returns a new Reactor with the given
// consensusState.
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
func NewReactor(consensusState *State, options ...ReactorOption) *Reactor {
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
conS: consensusState,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
@@ -72,21 +70,12 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
// OnStart implements BaseService by subscribing to events, which later will be
// broadcasted to other peers and starting state if we're not in block sync.
func (conR *Reactor) OnStart() error {
conR.Logger.Info("Reactor ", "waitSync", conR.WaitSync())
// start routine that computes peer statistics for evaluating peer quality
go conR.peerStatsRoutine()
conR.subscribeToBroadcastEvents()
go conR.updateRoundStateRoutine()
if !conR.WaitSync() {
err := conR.conS.Start()
if err != nil {
return err
}
}
return nil
}
@@ -94,47 +83,34 @@ func (conR *Reactor) OnStart() error {
// state.
func (conR *Reactor) OnStop() {
conR.unsubscribeFromBroadcastEvents()
if err := conR.conS.Stop(); err != nil {
conR.Logger.Error("Error stopping consensus state", "err", err)
}
if !conR.WaitSync() {
if conR.conS.IsRunning() {
if err := conR.conS.Stop(); err != nil {
conR.Logger.Error("Error stopping consensus state", "err", err)
}
conR.conS.Wait()
}
}
func (conR *Reactor) IsConsensusRunning() bool {
return conR.conS.IsRunning()
}
// SwitchToConsensus switches from block_sync mode to consensus mode.
// It resets the state, turns off block_sync, and starts the consensus state-machine
func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) {
func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) error {
conR.Logger.Info("SwitchToConsensus")
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
if state.LastBlockHeight > state.InitialHeight {
conR.conS.reconstructLastCommit(state)
}
// NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a
// NewRoundStepMessage.
conR.conS.updateToState(state)
conR.mtx.Lock()
conR.waitSync = false
conR.mtx.Unlock()
conR.Metrics.BlockSyncing.Set(0)
conR.Metrics.StateSyncing.Set(0)
if skipWAL {
conR.conS.doWALCatchup = false
}
err := conR.conS.Start()
if err != nil {
panic(fmt.Sprintf(`Failed to start consensus state: %v
conS:
%+v
conR:
%+v`, err, conR.conS, conR))
}
return conR.conS.Start()
}
// GetChannels implements Reactor
@@ -201,7 +177,7 @@ func (conR *Reactor) AddPeer(peer p2p.Peer) {
// Send our state to peer.
// If we're block_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !conR.WaitSync() {
if conR.conS.IsRunning() {
conR.sendNewRoundStepMessage(peer)
}
}
@@ -311,7 +287,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
case DataChannel:
if conR.WaitSync() {
if !conR.conS.IsRunning() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
@@ -330,7 +306,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
case VoteChannel:
if conR.WaitSync() {
if !conR.conS.IsRunning() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
@@ -352,7 +328,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
case VoteSetBitsChannel:
if conR.WaitSync() {
if !conR.conS.IsRunning() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
@@ -393,13 +369,6 @@ func (conR *Reactor) SetEventBus(b *types.EventBus) {
conR.conS.SetEventBus(b)
}
// WaitSync returns whether the consensus reactor is waiting for state/block sync.
func (conR *Reactor) WaitSync() bool {
conR.mtx.RLock()
defer conR.mtx.RUnlock()
return conR.waitSync
}
//--------------------------------------
// subscribeToBroadcastEvents subscribes for new round steps and votes
@@ -543,6 +512,11 @@ OUTER_LOOP:
if !peer.IsRunning() || !conR.IsRunning() {
return
}
if !conR.IsConsensusRunning() {
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
continue OUTER_LOOP
}
rs := conR.getRoundState()
prs := ps.GetRoundState()
@@ -687,7 +661,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
}
return
}
// logger.Info("No parts to send in catch-up, sleeping")
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
}
@@ -697,12 +671,16 @@ func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
// Simple hack to throttle logs upon sleep.
var sleeping = 0
OUTER_LOOP:
for {
// Manage disconnects from self or peer.
if !peer.IsRunning() || !conR.IsRunning() {
return
}
if !conR.IsConsensusRunning() {
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
continue
}
rs := conR.getRoundState()
prs := ps.GetRoundState()
@@ -713,14 +691,11 @@ OUTER_LOOP:
sleeping = 0
}
// logger.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
// "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
heightLogger := logger.With("height", prs.Height)
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
continue OUTER_LOOP
continue
}
}
@@ -729,7 +704,7 @@ OUTER_LOOP:
if prs.Height != 0 && rs.Height == prs.Height+1 {
if ps.PickSendVote(rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
continue
}
}
@@ -742,7 +717,7 @@ OUTER_LOOP:
if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil {
if ps.PickSendVote(commit) {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
continue OUTER_LOOP
continue
}
}
}
@@ -759,7 +734,7 @@ OUTER_LOOP:
}
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
continue OUTER_LOOP
continue
}
}
@@ -833,6 +808,11 @@ OUTER_LOOP:
return
}
if !conR.IsConsensusRunning() {
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
continue OUTER_LOOP
}
// Maybe send Height/Round/Prevotes
{
rs := conR.getRoundState()
@@ -918,8 +898,6 @@ OUTER_LOOP:
}
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
continue OUTER_LOOP
}
}

View File

@@ -55,9 +55,8 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
blocksSubs := make([]types.Subscription, 0)
eventBuses := make([]*types.EventBus, n)
for i := 0; i < n; i++ {
/*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info")
if err != nil { t.Fatal(err)}*/
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
// Note, we dont start the consensus states
reactors[i] = NewReactor(css[i])
reactors[i].SetLogger(css[i].Logger)
// eventBus is already started with the cs
@@ -88,7 +87,8 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
// TODO: is this still true with new pubsub?
for i := 0; i < n; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s, false)
err := reactors[i].SwitchToConsensus(s, false)
require.NoError(t, err)
}
return reactors, blocksSubs, eventBuses
}
@@ -113,7 +113,7 @@ func stopConsensusNet(logger log.Logger, reactors []*Reactor, eventBuses []*type
// Ensure a testnet makes blocks
func TestReactorBasic(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -135,6 +135,8 @@ func TestReactorWithEvidence(t *testing.T) {
// css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
css := make([]*State, nValidators)
logger := consensusLogger()
for i := 0; i < nValidators; i++ {
@@ -142,7 +144,6 @@ func TestReactorWithEvidence(t *testing.T) {
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: false,
})
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
defer os.RemoveAll(thisConfig.RootDir)
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
@@ -203,7 +204,7 @@ func TestReactorWithEvidence(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool2, WithState(state.Copy()))
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)
@@ -237,7 +238,7 @@ func TestReactorWithEvidence(t *testing.T) {
// Ensure a testnet makes blocks when there are txs
func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore,
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore,
func(c *cfg.Config) {
c.Consensus.CreateEmptyBlocks = false
})
@@ -258,7 +259,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
N := 1
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
reactors, _, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -284,7 +285,7 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
N := 1
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
reactors, _, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -310,7 +311,7 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
// Test we record stats about votes and block parts from other peers.
func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -336,6 +337,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
nVals := 4
logger := log.TestingLogger()
css, cleanup := randConsensusNet(
t,
nVals,
"consensus_voting_power_changes_test",
newMockTickerFunc(true),
@@ -531,7 +533,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
// Check we can make blocks with skip_timeout_commit=false
func TestReactorWithTimeoutCommit(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newKVStore)
defer cleanup()
// override default SkipTimeoutCommit == true for tests
for i := 0; i < N; i++ {

View File

@@ -129,8 +129,8 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
}
pb.cs.Wait()
newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
newCS := NewState(pb.cs.config, pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, WithState(pb.genesisState.Copy()))
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()
@@ -332,8 +332,8 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
consensusState := NewState(csConfig, blockExec,
blockStore, mempool, evpool, WithState(state.Copy()))
consensusState.SetEventBus(eventBus)
return consensusState

View File

@@ -67,7 +67,8 @@ func TestMain(m *testing.M) {
func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config,
lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) {
logger := log.TestingLogger()
state, _ := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile())
state, err := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile())
require.NoError(t, err)
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
consensusReplayConfig,
@@ -81,7 +82,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
bytes, _ := os.ReadFile(cs.config.WalFile())
t.Logf("====== WAL: \n\r%X\n", bytes)
err := cs.Start()
err = cs.Start()
require.NoError(t, err)
defer func() {
if err := cs.Stop(); err != nil {
@@ -555,40 +556,40 @@ func TestSimulateValidatorsChange(t *testing.T) {
// Sync from scratch
func TestHandshakeReplayAll(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, 0, m, false)
testHandshakeReplay(t, 0, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, 0, m, true)
testHandshakeReplay(t, 0, m, true)
}
}
// Sync many, not from scratch
func TestHandshakeReplaySome(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, 2, m, false)
testHandshakeReplay(t, 2, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, 2, m, true)
testHandshakeReplay(t, 2, m, true)
}
}
// Sync from lagging by one
func TestHandshakeReplayOne(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks-1, m, false)
testHandshakeReplay(t, numBlocks-1, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks-1, m, true)
testHandshakeReplay(t, numBlocks-1, m, true)
}
}
// Sync from caught up
func TestHandshakeReplayNone(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks, m, false)
testHandshakeReplay(t, numBlocks, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks, m, true)
testHandshakeReplay(t, numBlocks, m, true)
}
}
@@ -660,25 +661,27 @@ func tempWALWithData(data []byte) string {
// Make some blocks. Start a fresh app and apply nBlocks blocks.
// Then restart the app and sync it up with the remaining blocks
func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint, testValidatorsChange bool) {
var chain []*types.Block
var commits []*types.Commit
var store *mockBlockStore
var stateDB dbm.DB
var genesisState sm.State
func testHandshakeReplay(t *testing.T, nBlocks int, mode uint, testValidatorsChange bool) {
var (
chain []*types.Block
commits []*types.Commit
store *mockBlockStore
stateDB dbm.DB
genesisState sm.State
config *cfg.Config
)
if testValidatorsChange {
testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
defer os.RemoveAll(testConfig.RootDir)
config = ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
defer os.RemoveAll(config.RootDir)
stateDB = dbm.NewMemDB()
genesisState = sim.GenesisState
config = sim.Config
chain = append([]*types.Block{}, sim.Chain...) // copy chain
commits = sim.Commits
store = newMockBlockStore(t, config, genesisState.ConsensusParams)
} else { // test single node
testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
defer os.RemoveAll(testConfig.RootDir)
config = ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
defer os.RemoveAll(config.RootDir)
walBody, err := WALWithNBlocks(t, numBlocks)
require.NoError(t, err)
walFile := tempWALWithData(walBody)
@@ -811,14 +814,11 @@ func buildAppStateFromChain(t *testing.T, proxyApp proxy.AppConns, stateStore sm
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
validators := types.TM2PB.ValidatorUpdates(state.Validators)
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
_, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
Validators: validators,
}); err != nil {
panic(err)
}
if err := stateStore.Save(state); err != nil { // save height 1's validatorsInfo
panic(err)
}
})
require.NoError(t, err)
require.NoError(t, stateStore.Save(state))
switch mode {
case 0:
for i := 0; i < nBlocks; i++ {

View File

@@ -148,7 +148,6 @@ type StateOption func(*State)
// NewState returns a new State.
func NewState(
config *cfg.ConsensusConfig,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
@@ -177,13 +176,6 @@ func NewState(
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
}
cs.updateToState(state)
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(nil, "State", cs)
@@ -207,10 +199,19 @@ func (cs *State) SetEventBus(b *types.EventBus) {
}
// StateMetrics sets the metrics.
func StateMetrics(metrics *Metrics) StateOption {
func WithMetrics(metrics *Metrics) StateOption {
return func(cs *State) { cs.metrics = metrics }
}
func WithState(state sm.State) StateOption {
return func(cs *State) {
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
}
cs.updateToState(state)
}
}
// String returns a string.
func (cs *State) String() string {
// better not to access shared variables
@@ -297,6 +298,10 @@ func (cs *State) LoadCommit(height int64) *types.Commit {
// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
func (cs *State) OnStart() error {
if cs.state.IsEmpty() {
return errors.New("no state to commence consensus on")
}
// We may set the WAL in testing before calling Start, so only OpenWAL if its
// still the nilWAL.
if _, ok := cs.wal.(nilWAL); ok {
@@ -612,7 +617,7 @@ func (cs *State) updateToState(state sm.State) {
// signal the new round step, because other services (eg. txNotifier)
// depend on having an up-to-date peer state!
if state.LastBlockHeight <= cs.state.LastBlockHeight {
cs.Logger.Debug(
cs.Logger.Info(
"ignoring updateToState()",
"new_height", state.LastBlockHeight+1,
"old_height", cs.state.LastBlockHeight+1,
@@ -2275,11 +2280,10 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
return nil
}
// TODO: pass pubKey to signVote
vote, err := cs.signVote(msgType, hash, header)
if err == nil {
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
cs.Logger.Info("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
return vote
}

View File

@@ -121,7 +121,7 @@ func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerID p2p.ID) (added bool,
return
}
voteSet := hvs.getVoteSet(vote.Round, vote.Type)
if voteSet == nil {
if voteSet.IsEmpty() {
if rndz := hvs.peerCatchupRounds[peerID]; len(rndz) < 2 {
hvs.addRound(vote.Round)
voteSet = hvs.getVoteSet(vote.Round, vote.Type)
@@ -166,7 +166,7 @@ func (hvs *HeightVoteSet) POLInfo() (polRound int32, polBlockID types.BlockID) {
func (hvs *HeightVoteSet) getVoteSet(round int32, voteType tmproto.SignedMsgType) *types.VoteSet {
rvs, ok := hvs.roundVoteSets[round]
if !ok {
return nil
return &types.VoteSet{}
}
switch voteType {
case tmproto.PrevoteType:

View File

@@ -5,6 +5,7 @@ import (
"os"
"testing"
"github.com/stretchr/testify/require"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/internal/test"
@@ -30,30 +31,23 @@ func TestPeerCatchupRounds(t *testing.T) {
vote999_0 := makeVoteHR(t, 1, 0, 999, privVals)
added, err := hvs.AddVote(vote999_0, "peer1")
if !added || err != nil {
t.Error("Expected to successfully add vote from peer", added, err)
}
require.NoError(t, err)
require.True(t, added)
vote1000_0 := makeVoteHR(t, 1, 0, 1000, privVals)
added, err = hvs.AddVote(vote1000_0, "peer1")
if !added || err != nil {
t.Error("Expected to successfully add vote from peer", added, err)
}
require.NoError(t, err)
require.True(t, added)
vote1001_0 := makeVoteHR(t, 1, 0, 1001, privVals)
added, err = hvs.AddVote(vote1001_0, "peer1")
if err != ErrGotVoteFromUnwantedRound {
t.Errorf("expected GotVoteFromUnwantedRoundError, but got %v", err)
}
if added {
t.Error("Expected to *not* add vote from peer, too many catchup rounds.")
}
require.Error(t, err)
require.Equal(t, ErrGotVoteFromUnwantedRound, err)
require.False(t, added)
added, err = hvs.AddVote(vote1001_0, "peer2")
if !added || err != nil {
t.Error("Expected to successfully add vote from another peer")
}
require.NoError(t, err)
require.True(t, added)
}
func makeVoteHR(t *testing.T, height int64, valIndex, round int32, privVals []types.PrivValidator) *types.Vote {

View File

@@ -86,7 +86,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState := NewState(config.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy()))
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
if privValidator != nil {

View File

@@ -11,7 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
bc "github.com/tendermint/tendermint/blocksync"
"github.com/tendermint/tendermint/blocksync"
cfg "github.com/tendermint/tendermint/config"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/evidence"
@@ -50,7 +50,6 @@ type Node struct {
privValidator types.PrivValidator // local node's validator key
// network
transport *p2p.MultiplexTransport
sw *p2p.Switch // p2p connections
addrBook pex.AddrBook // known peers
nodeInfo p2p.NodeInfo
@@ -60,11 +59,10 @@ type Node struct {
// services
eventBus *types.EventBus // pub/sub for services
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor p2p.Reactor // for block-syncing
mempoolReactor p2p.Reactor // for gossipping transactions
blockStore *store.BlockStore // store the blockchain to disk
bcReactor *blocksync.Reactor // for block-syncing
mempoolReactor p2p.Reactor // for gossipping transactions
mempool mempl.Mempool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node
stateSyncGenesis sm.State // provides the genesis state for state sync
@@ -79,6 +77,7 @@ type Node struct {
indexerService *txindex.IndexerService
prometheusSrv *http.Server
pprofSrv *http.Server
customReactors map[string]p2p.Reactor
}
// Option sets a parameter for the node.
@@ -98,29 +97,7 @@ type Option func(*Node)
// - STATESYNC
func CustomReactors(reactors map[string]p2p.Reactor) Option {
return func(n *Node) {
for name, reactor := range reactors {
if existingReactor := n.sw.Reactor(name); existingReactor != nil {
n.sw.Logger.Info("Replacing existing reactor with a custom one",
"name", name, "existing", existingReactor, "custom", reactor)
n.sw.RemoveReactor(name, existingReactor)
}
n.sw.AddReactor(name, reactor)
// register the new channels to the nodeInfo
// NOTE: This is a bit messy now with the type casting but is
// cleaned up in the following version when NodeInfo is changed from
// and interface to a concrete type
if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok {
for _, chDesc := range reactor.GetChannels() {
if !ni.HasChannel(chDesc.ID) {
ni.Channels = append(ni.Channels, chDesc.ID)
n.transport.AddChannel(chDesc.ID)
}
}
n.nodeInfo = ni
} else {
n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.")
}
}
n.customReactors = reactors
}
}
@@ -160,74 +137,26 @@ func NewNode(config *cfg.Config,
return nil, err
}
csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID)
csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics, bsMetrics, ssMetrics := metricsProvider(genDoc.ChainID)
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics)
proxyApp, err := createProxyAppConns(clientCreator, logger, abciMetrics)
if err != nil {
return nil, err
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs, or, endblocker panicked)
eventBus, err := createAndStartEventBus(logger)
if err != nil {
return nil, err
}
eventBus := createEventBus(logger)
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config,
indexerService, txIndexer, blockIndexer, err := createIndexerService(config,
genDoc.ChainID, dbProvider, eventBus, logger)
if err != nil {
return nil, err
}
// If an address is provided, listen on the socket for a connection from an
// external signing process.
if config.PrivValidatorListenAddr != "" {
// FIXME: we should start services inside OnStart
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger)
if err != nil {
return nil, fmt.Errorf("error with private validator socket client: %w", err)
}
}
pubKey, err := privValidator.GetPubKey()
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}
// Determine whether we should attempt state sync.
stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
}
// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state, err = stateStore.Load()
if err != nil {
return nil, fmt.Errorf("cannot load state: %w", err)
}
}
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := config.BlockSyncMode && !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, logger, consensusLogger)
// Make MempoolReactor
mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)
@@ -249,21 +178,16 @@ func NewNode(config *cfg.Config,
)
// Make BlocksyncReactor. Don't start block sync if we're doing a state sync first.
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, bsMetrics, logger)
if err != nil {
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
if stateSync {
csMetrics.StateSyncing.Set(1)
} else if blockSync {
csMetrics.BlockSyncing.Set(1)
}
consensusLogger := logger.With("module", "consensus")
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger,
config, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, eventBus, consensusLogger,
)
// Set up state sync reactor, and schedule a sync if requested.
@@ -275,22 +199,15 @@ func NewNode(config *cfg.Config,
proxyApp.Snapshot(),
proxyApp.Query(),
config.StateSync.TempDir,
ssMetrics,
)
stateSyncReactor.SetLogger(logger.With("module", "statesync"))
nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
if err != nil {
return nil, err
}
// Setup Transport.
transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp)
// Setup Switch.
p2pLogger := logger.With("module", "p2p")
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactor, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
config, p2pMetrics, mempoolReactor, bcReactor,
stateSyncReactor, consensusReactor, evidenceReactor, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
@@ -303,7 +220,7 @@ func NewNode(config *cfg.Config,
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
addrBook, err := createAddrBook(config, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
@@ -322,7 +239,8 @@ func NewNode(config *cfg.Config,
// Note we currently use the addrBook regardless at least for AddOurAddress
var pexReactor *pex.Reactor
if config.P2P.PexReactor {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
pexReactor = createPEXReactor(addrBook, config, logger)
sw.AddReactor("PEX", pexReactor)
}
// Add private IDs to addrbook to block those peers being added
@@ -333,11 +251,9 @@ func NewNode(config *cfg.Config,
genesisDoc: genDoc,
privValidator: privValidator,
transport: transport,
sw: sw,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
sw: sw,
addrBook: addrBook,
nodeKey: nodeKey,
stateStore: stateStore,
blockStore: blockStore,
@@ -347,7 +263,6 @@ func NewNode(config *cfg.Config,
consensusState: consensusState,
consensusReactor: consensusReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
pexReactor: pexReactor,
evidencePool: evidencePool,
@@ -363,6 +278,15 @@ func NewNode(config *cfg.Config,
option(node)
}
for name, reactor := range node.customReactors {
if existingReactor := node.sw.Reactor(name); existingReactor != nil {
node.sw.Logger.Info("Replacing existing reactor with a custom one",
"name", name, "existing", existingReactor, "custom", reactor)
node.sw.RemoveReactor(name, existingReactor)
}
node.sw.AddReactor(name, reactor)
}
return node, nil
}
@@ -375,6 +299,77 @@ func (n *Node) OnStart() error {
time.Sleep(genTime.Sub(now))
}
// If an address is provided, listen on the socket for a connection from an
// external signing process. This will overwrite the privvalidator provided in the constructor
if n.config.PrivValidatorListenAddr != "" {
var err error
n.privValidator, err = createPrivValidatorSocketClient(n.config.PrivValidatorListenAddr, n.genesisDoc.ChainID, n.Logger)
if err != nil {
return fmt.Errorf("error with private validator socket client: %w", err)
}
}
pubKey, err := n.privValidator.GetPubKey()
if err != nil {
return fmt.Errorf("can't get pubkey: %w", err)
}
state, err := n.stateStore.LoadFromDBOrGenesisDoc(n.genesisDoc)
if err != nil {
return fmt.Errorf("cannot load state: %w", err)
}
// Determine whether we should attempt state sync.
stateSync := n.config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
n.Logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
if !stateSync {
if err := doHandshake(n.stateStore, state, n.blockStore, n.genesisDoc, n.eventBus, n.proxyApp, n.Logger); err != nil {
return err
}
// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state, err = n.stateStore.Load()
if err != nil {
return fmt.Errorf("cannot load state: %w", err)
}
}
nodeInfo, err := makeNodeInfo(n.config, n.nodeKey, n.txIndexer, n.genesisDoc, state)
if err != nil {
return err
}
for _, reactor := range n.customReactors {
for _, chDesc := range reactor.GetChannels() {
if !nodeInfo.HasChannel(chDesc.ID) {
nodeInfo.Channels = append(nodeInfo.Channels, chDesc.ID)
}
}
}
n.nodeInfo = nodeInfo
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress))
if err != nil {
return err
}
// Setup Transport.
transport, peerFilters := createTransport(n.config, n.nodeInfo, n.nodeKey, addr, n.proxyApp)
n.sw.SetTransport(transport)
n.sw.SetPeerFilters(peerFilters...)
n.sw.SetNodeInfo(n.nodeInfo)
n.sw.SetNodeKey(n.nodeKey)
// run pprof server if it is enabled
if n.config.RPC.IsPprofEnabled() {
n.pprofSrv = n.startPprofServer()
@@ -395,16 +390,13 @@ func (n *Node) OnStart() error {
n.rpcListeners = listeners
}
// Start the transport.
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(*addr); err != nil {
if err := n.eventBus.Start(); err != nil {
return err
}
n.isListening = true
if err := n.indexerService.Start(); err != nil {
return err
}
// Start the switch (the P2P server).
err = n.sw.Start()
@@ -412,23 +404,37 @@ func (n *Node) OnStart() error {
return err
}
n.isListening = true
// Always connect to persistent peers
err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
return fmt.Errorf("could not dial peers from persistent_peers field: %w", err)
}
// Run state sync
if n.stateSync {
bcR, ok := n.bcReactor.(blockSyncReactor)
if !ok {
return fmt.Errorf("this blocksync reactor does not support switching from state sync")
}
err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := n.config.BlockSyncMode && !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, n.Logger)
// Run start up phases
if stateSync {
err := startStateSync(n.stateSyncReactor, n.bcReactor, n.consensusReactor, n.stateSyncProvider,
n.config.StateSync, n.config.BlockSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis)
if err != nil {
return fmt.Errorf("failed to start state sync: %w", err)
}
} else if blockSync {
err := n.bcReactor.SwitchToBlockSync(state)
if err != nil {
return fmt.Errorf("failed to start block sync: %w", err)
}
} else {
err := n.consensusReactor.SwitchToConsensus(state, false)
if err != nil {
return fmt.Errorf("failed to switch to consensus: %w", err)
}
}
return nil
@@ -453,10 +459,6 @@ func (n *Node) OnStop() {
n.Logger.Error("Error closing switch", "err", err)
}
if err := n.transport.Close(); err != nil {
n.Logger.Error("Error closing transport", "err", err)
}
n.isListening = false
// finally stop the listeners / external services
@@ -518,6 +520,8 @@ func (n *Node) ConfigureRPC() error {
TxIndexer: n.txIndexer,
BlockIndexer: n.blockIndexer,
ConsensusReactor: n.consensusReactor,
BlocksyncReactor: n.bcReactor,
StatesyncReactor: n.stateSyncReactor,
EventBus: n.eventBus,
Mempool: n.mempool,
@@ -651,7 +655,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
}
// startPrometheusServer starts a Prometheus HTTP server, listening for metrics
// collectors on addr.
// collectors on the provided address.
func (n *Node) startPrometheusServer() *http.Server {
srv := &http.Server{
Addr: n.config.Instrumentation.PrometheusListenAddr,
@@ -672,7 +676,7 @@ func (n *Node) startPrometheusServer() *http.Server {
return srv
}
// starts a ppro
// starts a pprof server at the specified listen address
func (n *Node) startPprofServer() *http.Server {
srv := &http.Server{
Addr: n.config.RPC.PprofListenAddress,
@@ -682,7 +686,7 @@ func (n *Node) startPprofServer() *http.Server {
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
n.Logger.Error("pprof HTTP server ListenAndServe", "err", err)
n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
}
}()
return srv
@@ -793,7 +797,7 @@ func makeNodeInfo(
Network: genDoc.ChainID,
Version: version.TMCoreSemVer,
Channels: []byte{
bc.BlocksyncChannel,
blocksync.BlocksyncChannel,
cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
mempl.MempoolChannel,
evidence.EvidenceChannel,

View File

@@ -123,6 +123,8 @@ func TestNodeSetAppVersion(t *testing.T) {
// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
require.NoError(t, n.Start())
defer n.Stop() //nolint:errcheck
// default config uses the kvstore app
var appVersion = kvstore.ProtocolVersion
@@ -189,6 +191,8 @@ func TestNodeSetPrivValTCP(t *testing.T) {
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
require.NoError(t, n.Start())
defer n.Stop() //nolint:errcheck
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
}
@@ -201,7 +205,7 @@ func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
config.BaseConfig.PrivValidatorListenAddr = addrNoPrefix
_, err := DefaultNewNode(config, log.TestingLogger())
assert.Error(t, err)
require.NoError(t, err)
}
func TestNodeSetPrivValIPC(t *testing.T) {
@@ -451,7 +455,6 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
RecvMessageCapacity: 100,
},
}
customBlocksyncReactor := p2pmock.NewReactor()
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)
@@ -464,7 +467,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
log.TestingLogger(),
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKSYNC": customBlocksyncReactor}),
CustomReactors(map[string]p2p.Reactor{"FOO": cr}),
)
require.NoError(t, err)
@@ -475,9 +478,6 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
assert.True(t, cr.IsRunning())
assert.Equal(t, cr, n.Switch().Reactor("FOO"))
assert.True(t, customBlocksyncReactor.IsRunning())
assert.Equal(t, customBlocksyncReactor, n.Switch().Reactor("BLOCKSYNC"))
channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
assert.Contains(t, channels, mempl.MempoolChannel)
assert.Contains(t, channels, cr.Channels[0].ID)

View File

@@ -12,7 +12,7 @@ import (
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
bc "github.com/tendermint/tendermint/blocksync"
"github.com/tendermint/tendermint/blocksync"
cfg "github.com/tendermint/tendermint/config"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
@@ -99,20 +99,22 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
}
// MetricsProvider returns a consensus, p2p and mempool Metrics.
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics)
// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID)
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID),
blocksync.PrometheusMetrics(config.Namespace, "chain_id", chainID),
statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID)
}
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics(), blocksync.NopMetrics(), statesync.NopMetrics()
}
}
@@ -138,7 +140,7 @@ func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.Block
return
}
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
func createProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator, metrics)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
@@ -147,16 +149,13 @@ func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.L
return proxyApp, nil
}
func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
func createEventBus(logger log.Logger) *types.EventBus {
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, err
}
return eventBus, nil
return eventBus
}
func createAndStartIndexerService(
func createIndexerService(
config *cfg.Config,
chainID string,
dbProvider DBProvider,
@@ -197,10 +196,6 @@ func createAndStartIndexerService(
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
indexerService.SetLogger(logger.With("module", "txindex"))
if err := indexerService.Start(); err != nil {
return nil, nil, nil, err
}
return indexerService, txIndexer, blockIndexer, nil
}
@@ -222,7 +217,7 @@ func doHandshake(
return nil
}
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger) {
// Log the version info.
logger.Info("Version info",
"tendermint_version", version.TMCoreSemVer,
@@ -243,9 +238,9 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
logger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
} else {
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
logger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
}
}
@@ -334,12 +329,12 @@ func createBlocksyncReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
blockSync bool,
metrics *blocksync.Metrics,
logger log.Logger,
) (bcReactor p2p.Reactor, err error) {
) (bcReactor *blocksync.Reactor, err error) {
switch config.BlockSync.Version {
case "v0":
bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync)
bcReactor = blocksync.NewReactor(state.Copy(), blockExec, blockStore, metrics)
case "v1", "v2":
return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version)
default:
@@ -351,31 +346,28 @@ func createBlocksyncReactor(config *cfg.Config,
}
func createConsensusReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mempool mempl.Mempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
waitSync bool,
eventBus *types.EventBus,
consensusLogger log.Logger,
) (*cs.Reactor, *cs.State) {
consensusState := cs.NewState(
config.Consensus,
state.Copy(),
blockExec,
blockStore,
mempool,
evidencePool,
cs.StateMetrics(csMetrics),
cs.WithMetrics(csMetrics),
)
consensusState.SetLogger(consensusLogger)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}
consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics))
consensusReactor := cs.NewReactor(consensusState, cs.ReactorMetrics(csMetrics))
consensusReactor.SetLogger(consensusLogger)
// services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor
@@ -387,6 +379,7 @@ func createTransport(
config *cfg.Config,
nodeInfo p2p.NodeInfo,
nodeKey *p2p.NodeKey,
netAddr *p2p.NetAddress,
proxyApp proxy.AppConns,
) (
*p2p.MultiplexTransport,
@@ -394,7 +387,7 @@ func createTransport(
) {
var (
mConnConfig = p2p.MConnConfig(config.P2P)
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, *netAddr, mConnConfig)
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)
@@ -453,23 +446,17 @@ func createTransport(
}
func createSwitch(config *cfg.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
peerFilters []p2p.PeerFilterFunc,
mempoolReactor p2p.Reactor,
bcReactor p2p.Reactor,
stateSyncReactor *statesync.Reactor,
consensusReactor *cs.Reactor,
evidenceReactor *evidence.Reactor,
nodeInfo p2p.NodeInfo,
nodeKey *p2p.NodeKey,
p2pLogger log.Logger,
) *p2p.Switch {
sw := p2p.NewSwitch(
config.P2P,
transport,
p2p.WithMetrics(p2pMetrics),
p2p.SwitchPeerFilters(peerFilters...),
)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
@@ -477,15 +464,10 @@ func createSwitch(config *cfg.Config,
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
return sw
}
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
func createAddrBook(config *cfg.Config,
p2pLogger log.Logger, nodeKey *p2p.NodeKey,
) (pex.AddrBook, error) {
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
@@ -506,15 +488,10 @@ func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
}
addrBook.AddOurAddress(addr)
}
sw.SetAddrBook(addrBook)
return addrBook, nil
}
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
sw *p2p.Switch, logger log.Logger,
) *pex.Reactor {
func createPEXReactor(addrBook pex.AddrBook, config *cfg.Config, logger log.Logger) *pex.Reactor {
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewReactor(addrBook,
&pex.ReactorConfig{
@@ -529,7 +506,6 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
})
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
return pexReactor
}
@@ -575,16 +551,17 @@ func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.React
}
if blockSync {
// FIXME Very ugly to have these metrics bleed through here.
conR.Metrics.StateSyncing.Set(0)
conR.Metrics.BlockSyncing.Set(1)
err = bcR.SwitchToBlockSync(state)
if err != nil {
ssR.Logger.Error("Failed to switch to block sync", "err", err)
return
}
} else {
conR.SwitchToConsensus(state, true)
err := conR.SwitchToConsensus(state, true)
if err != nil {
ssR.Logger.Error("Failed to switch to consensus", "err", err)
return
}
}
}()
return nil
@@ -659,7 +636,7 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error {
return nil
}
func createAndStartPrivValidatorSocketClient(
func createPrivValidatorSocketClient(
listenAddr,
chainID string,
logger log.Logger,
@@ -674,12 +651,6 @@ func createAndStartPrivValidatorSocketClient(
return nil, fmt.Errorf("failed to start private validator: %w", err)
}
// try to get a pubkey from private validate first time
_, err = pvsc.GetPubKey()
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}
const (
retries = 50 // 50 * 100ms = 5s total
timeout = 100 * time.Millisecond

View File

@@ -1,6 +1,7 @@
package p2p
import (
"errors"
"fmt"
"math"
"sync"
@@ -107,7 +108,6 @@ type SwitchOption func(*Switch)
// NewSwitch creates a new Switch with the given config.
func NewSwitch(
cfg *config.P2PConfig,
transport Transport,
options ...SwitchOption,
) *Switch {
@@ -121,7 +121,6 @@ func NewSwitch(
dialing: cmap.NewCMap(),
reconnecting: cmap.NewCMap(),
metrics: NopMetrics(),
transport: transport,
filterTimeout: defaultFilterTimeout,
persistentPeersAddrs: make([]*NetAddress, 0),
unconditionalPeerIDs: make(map[ID]struct{}),
@@ -224,11 +223,30 @@ func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
sw.nodeKey = nodeKey
}
func (sw *Switch) SetPeerFilters(filters ...PeerFilterFunc) {
sw.peerFilters = filters
}
func (sw *Switch) SetTransport(transport Transport) {
if sw.IsRunning() {
panic("cannot set transport while switch is running")
}
sw.transport = transport
}
//---------------------------------------------------------------------
// Service start/stop
// OnStart implements BaseService. It starts all the reactors and peers.
func (sw *Switch) OnStart() error {
if sw.transport == nil {
return errors.New("transport not set")
}
if err := sw.transport.Start(); err != nil {
return err
}
// Start reactors
for _, reactor := range sw.reactors {
err := reactor.Start()
@@ -257,6 +275,10 @@ func (sw *Switch) OnStop() {
sw.Logger.Error("error while stopped reactor", "reactor", reactor, "error", err)
}
}
if err := sw.transport.Stop(); err != nil {
sw.Logger.Error("closing transport", "err", err)
}
}
//---------------------------------------------------------------------

View File

@@ -22,6 +22,7 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p/conn"
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
@@ -695,9 +696,16 @@ func TestSwitchAcceptRoutine(t *testing.T) {
}
type errorTransport struct {
service.BaseService
acceptErr error
}
func newErrTransport(acceptErr error) *errorTransport {
t := &errorTransport{acceptErr: acceptErr}
t.BaseService = *service.NewBaseService(nil, "Error Transport", t)
return t
}
func (et errorTransport) NetAddress() NetAddress {
panic("not implemented")
}
@@ -713,7 +721,9 @@ func (errorTransport) Cleanup(Peer) {
}
func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}})
sw := NewSwitch(cfg)
sw.SetTransport(newErrTransport(ErrFilterTimeout{}))
assert.NotPanics(t, func() {
err := sw.Start()
require.NoError(t, err)
@@ -721,7 +731,8 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
require.NoError(t, err)
})
sw = NewSwitch(cfg, errorTransport{ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}})
sw = NewSwitch(cfg)
sw.SetTransport(newErrTransport(ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}))
assert.NotPanics(t, func() {
err := sw.Start()
require.NoError(t, err)
@@ -730,7 +741,8 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
})
// TODO(melekes) check we remove our address from addrBook
sw = NewSwitch(cfg, errorTransport{ErrTransportClosed{}})
sw = NewSwitch(cfg)
sw.SetTransport(newErrTransport(ErrTransportClosed{}))
assert.NotPanics(t, func() {
err := sw.Start()
require.NoError(t, err)

View File

@@ -194,14 +194,11 @@ func MakeSwitch(
panic(err)
}
t := NewMultiplexTransport(nodeInfo, nodeKey, MConnConfig(cfg))
if err := t.Listen(*addr); err != nil {
panic(err)
}
t := NewMultiplexTransport(nodeInfo, nodeKey, *addr, MConnConfig(cfg))
// TODO: let the config be passed in?
sw := initSwitch(i, NewSwitch(cfg, t, opts...))
sw := initSwitch(i, NewSwitch(cfg, opts...))
sw.SetTransport(t)
sw.SetLogger(log.TestingLogger().With("switch", i))
sw.SetNodeKey(&nodeKey)

View File

@@ -11,6 +11,7 @@ import (
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p/conn"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
@@ -59,6 +60,8 @@ type peerConfig struct {
// the transport. Each transport is also responsible to filter establishing
// peers specific to its domain.
type Transport interface {
service.Service
// Listening address.
NetAddress() NetAddress
@@ -72,13 +75,6 @@ type Transport interface {
Cleanup(Peer)
}
// transportLifecycle bundles the methods for callers to control start and stop
// behavior.
type transportLifecycle interface {
Close() error
Listen(NetAddress) error
}
// ConnFilterFunc to be implemented by filter hooks after a new connection has
// been established. The set of exisiting connections is passed along together
// with all resolved IPs for the new connection.
@@ -136,6 +132,8 @@ func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
// MultiplexTransport accepts and dials tcp connections and upgrades them to
// multiplexed peers.
type MultiplexTransport struct {
service.BaseService
netAddr NetAddress
listener net.Listener
maxIncomingConnections int // see MaxIncomingConnections
@@ -162,15 +160,15 @@ type MultiplexTransport struct {
// Test multiplexTransport for interface completeness.
var _ Transport = (*MultiplexTransport)(nil)
var _ transportLifecycle = (*MultiplexTransport)(nil)
// NewMultiplexTransport returns a tcp connected multiplexed peer.
func NewMultiplexTransport(
nodeInfo NodeInfo,
nodeKey NodeKey,
netAddr NetAddress,
mConfig conn.MConnConfig,
) *MultiplexTransport {
return &MultiplexTransport{
t := &MultiplexTransport{
acceptc: make(chan accept),
closec: make(chan struct{}),
dialTimeout: defaultDialTimeout,
@@ -179,9 +177,12 @@ func NewMultiplexTransport(
mConfig: mConfig,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
netAddr: netAddr,
conns: NewConnSet(),
resolver: net.DefaultResolver,
}
t.BaseService = *service.NewBaseService(nil, "P2P Transport", t)
return t
}
// NetAddress implements Transport.
@@ -234,20 +235,20 @@ func (mt *MultiplexTransport) Dial(
return p, nil
}
// Close implements transportLifecycle.
func (mt *MultiplexTransport) Close() error {
// OnStop implements Service.
func (mt *MultiplexTransport) OnStop() {
close(mt.closec)
if mt.listener != nil {
return mt.listener.Close()
if err := mt.listener.Close(); err != nil {
mt.Logger.Error("closing listener", "err", err)
}
}
return nil
}
// Listen implements transportLifecycle.
func (mt *MultiplexTransport) Listen(addr NetAddress) error {
ln, err := net.Listen("tcp", addr.DialString())
// OnStart implements Service.
func (mt *MultiplexTransport) OnStart() error {
ln, err := net.Listen("tcp", mt.netAddr.DialString())
if err != nil {
return err
}
@@ -256,7 +257,6 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
}
mt.netAddr = addr
mt.listener = ln
go mt.acceptPeers()

View File

@@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/p2p/conn"
@@ -28,20 +29,26 @@ func emptyNodeInfo() NodeInfo {
func newMultiplexTransport(
nodeInfo NodeInfo,
nodeKey NodeKey,
netAddr NetAddress,
) *MultiplexTransport {
return NewMultiplexTransport(
nodeInfo, nodeKey, conn.DefaultMConnConfig(),
nodeInfo, nodeKey, netAddr, conn.DefaultMConnConfig(),
)
}
func TestTransportMultiplexConnFilter(t *testing.T) {
nodeKey := NodeKey{
PrivKey: ed25519.GenPrivKey(),
}
id := nodeKey.ID()
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
require.NoError(t, err)
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
nodeKey,
*addr,
)
id := mt.nodeKey.ID()
MultiplexTransportConnFilters(
func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil },
@@ -51,14 +58,8 @@ func TestTransportMultiplexConnFilter(t *testing.T) {
},
)(mt)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
err = mt.Start()
require.NoError(t, err)
errc := make(chan error)
@@ -89,13 +90,18 @@ func TestTransportMultiplexConnFilter(t *testing.T) {
}
func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
nodeKey := NodeKey{
PrivKey: ed25519.GenPrivKey(),
}
id := nodeKey.ID()
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
require.NoError(t, err)
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
nodeKey,
*addr,
)
id := mt.nodeKey.ID()
MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt)
MultiplexTransportConnFilters(
@@ -105,14 +111,9 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
},
)(mt)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
err = mt.Start()
require.NoError(t, err)
defer mt.Stop() //nolint:errcheck
errc := make(chan error)
go func() {
@@ -140,6 +141,10 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
pv := ed25519.GenPrivKey()
id := PubKeyToID(pv.PubKey())
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
mt := newMultiplexTransport(
testNodeInfo(
id, "transport",
@@ -147,26 +152,22 @@ func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
NodeKey{
PrivKey: pv,
},
*addr,
)
MultiplexTransportMaxIncomingConnections(0)(mt)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
const maxIncomingConns = 2
MultiplexTransportMaxIncomingConnections(maxIncomingConns)(mt)
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
err = mt.Start()
require.NoError(t, err)
laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
// Connect more peers than max
for i := 0; i <= maxIncomingConns; i++ {
errc := make(chan error)
go testDialer(*laddr, errc)
go testDialer(t, *laddr, errc)
err = <-errc
if i < maxIncomingConns {
@@ -198,7 +199,7 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) {
// Setup dialers.
for i := 0; i < nDialers; i++ {
go testDialer(*laddr, errc)
go testDialer(t, *laddr, errc)
}
// Catch connection errors.
@@ -235,23 +236,26 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) {
}
}
if err := mt.Close(); err != nil {
if err := mt.Stop(); err != nil {
t.Errorf("close errored: %v", err)
}
}
func testDialer(dialAddr NetAddress, errc chan error) {
var (
pv = ed25519.GenPrivKey()
dialer = newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName),
NodeKey{
PrivKey: pv,
},
)
func testDialer(t *testing.T, dialAddr NetAddress, errc chan error) {
pv := ed25519.GenPrivKey()
id := PubKeyToID(pv.PubKey())
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
require.NoError(t, err)
dialer := newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName),
NodeKey{
PrivKey: pv,
},
*addr,
)
_, err := dialer.Dial(dialAddr, peerConfig{})
_, err = dialer.Dial(dialAddr, peerConfig{})
if err != nil {
errc <- err
return
@@ -319,15 +323,14 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) {
go func() {
<-slowc
var (
dialer = newMultiplexTransport(
fastNodeInfo,
NodeKey{
PrivKey: fastNodePV,
},
)
)
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
dialer := newMultiplexTransport(
fastNodeInfo,
NodeKey{
PrivKey: fastNodePV,
},
*addr,
)
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -360,17 +363,16 @@ func TestTransportMultiplexValidateNodeInfo(t *testing.T) {
errc := make(chan error)
go func() {
var (
pv = ed25519.GenPrivKey()
dialer = newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
NodeKey{
PrivKey: pv,
},
)
)
pv := ed25519.GenPrivKey()
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
dialer := newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
NodeKey{
PrivKey: pv,
},
*addr,
)
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -401,6 +403,7 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
errc := make(chan error)
go func() {
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
dialer := newMultiplexTransport(
testNodeInfo(
PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer",
@@ -408,8 +411,8 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
*addr,
)
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -437,18 +440,16 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
func TestTransportMultiplexDialRejectWrongID(t *testing.T) {
mt := testSetupMultiplexTransport(t)
var (
pv = ed25519.GenPrivKey()
dialer = newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
NodeKey{
PrivKey: pv,
},
)
)
pv := ed25519.GenPrivKey()
wrongID := PubKeyToID(ed25519.GenPrivKey().PubKey())
addr := NewNetAddress(wrongID, mt.listener.Addr())
dialer := newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
NodeKey{
PrivKey: pv,
},
*addr,
)
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -471,14 +472,15 @@ func TestTransportMultiplexRejectIncompatible(t *testing.T) {
go func() {
var (
pv = ed25519.GenPrivKey()
addr = NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
dialer = newMultiplexTransport(
testNodeInfoWithNetwork(PubKeyToID(pv.PubKey()), "dialer", "incompatible-network"),
NodeKey{
PrivKey: pv,
},
*addr,
)
)
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -622,11 +624,16 @@ func TestTransportHandshake(t *testing.T) {
}
func TestTransportAddChannel(t *testing.T) {
pv := ed25519.GenPrivKey()
id := PubKeyToID(pv.PubKey())
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
require.NoError(t, err)
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
PrivKey: pv,
},
*addr,
)
testChannel := byte(0x01)
@@ -638,27 +645,27 @@ func TestTransportAddChannel(t *testing.T) {
// create listener
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
var (
pv = ed25519.GenPrivKey()
id = PubKeyToID(pv.PubKey())
mt = newMultiplexTransport(
testNodeInfo(
id, "transport",
),
NodeKey{
PrivKey: pv,
},
)
)
pv := ed25519.GenPrivKey()
id := PubKeyToID(pv.PubKey())
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
mt := newMultiplexTransport(
testNodeInfo(
id, "transport",
),
NodeKey{
PrivKey: pv,
},
*addr,
)
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
err = mt.Start()
require.NoError(t, err)
t.Cleanup(func() {
mt.Stop() //nolint:errcheck
})
// give the listener some time to get ready
time.Sleep(20 * time.Millisecond)

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"time"
"github.com/tendermint/tendermint/blocksync"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
@@ -16,6 +17,7 @@ import (
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/statesync"
"github.com/tendermint/tendermint/types"
)
@@ -91,6 +93,8 @@ type Environment struct {
TxIndexer txindex.TxIndexer
BlockIndexer indexer.BlockIndexer
ConsensusReactor *consensus.Reactor
BlocksyncReactor *blocksync.Reactor
StatesyncReactor *statesync.Reactor
EventBus *types.EventBus // thread safe
Mempool mempl.Mempool
@@ -199,9 +203,5 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
}
func latestUncommittedHeight() int64 {
nodeIsSyncing := env.ConsensusReactor.WaitSync()
if nodeIsSyncing {
return env.BlockStore.Height()
}
return env.BlockStore.Height() + 1
}

View File

@@ -51,6 +51,16 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
votingPower = val.VotingPower
}
phase := "initializing"
switch {
case env.StatesyncReactor.IsSyncing():
phase = "statesync"
case env.BlocksyncReactor.IsSyncing():
phase = "blocksync"
case env.ConsensusReactor.IsConsensusRunning():
phase = "consensus"
}
result := &ctypes.ResultStatus{
NodeInfo: env.P2PTransport.NodeInfo().(p2p.DefaultNodeInfo),
SyncInfo: ctypes.SyncInfo{
@@ -62,7 +72,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
EarliestAppHash: earliestAppHash,
EarliestBlockHeight: earliestBlockHeight,
EarliestBlockTime: time.Unix(0, earliestBlockTimeNano),
CatchingUp: env.ConsensusReactor.WaitSync(),
Phase: phase,
},
ValidatorInfo: ctypes.ValidatorInfo{
Address: env.PubKey.Address(),

View File

@@ -86,7 +86,9 @@ type SyncInfo struct {
EarliestBlockHeight int64 `json:"earliest_block_height"`
EarliestBlockTime time.Time `json:"earliest_block_time"`
CatchingUp bool `json:"catching_up"`
// Phase inidicates which processes are advancing state:
// Either statesync, blocksync or consensus
Phase string `json:"phase"`
}
// Info about the node's validator

View File

@@ -81,6 +81,9 @@ type State struct {
// Copy makes a copy of the State for mutating.
func (state State) Copy() State {
if state.IsEmpty() {
return State{}
}
return State{
Version: state.Version,

30
statesync/metrics.gen.go Normal file
View File

@@ -0,0 +1,30 @@
// Code generated by metricsgen. DO NOT EDIT.
package statesync
import (
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "syncing",
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
}
}
func NopMetrics() *Metrics {
return &Metrics{
Syncing: discard.NewGauge(),
}
}

19
statesync/metrics.go Normal file
View File

@@ -0,0 +1,19 @@
package statesync
import (
"github.com/go-kit/kit/metrics"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "statesync"
)
//go:generate go run ../scripts/metricsgen -struct=Metrics
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Whether or not a node is state syncing. 1 if yes, 0 if no.
Syncing metrics.Gauge
}

View File

@@ -33,6 +33,7 @@ type Reactor struct {
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
tempDir string
metrics *Metrics
// This will only be set when a state sync is in progress. It is used to feed received
// snapshots and chunks into the sync.
@@ -46,12 +47,14 @@ func NewReactor(
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
tempDir string,
metrics *Metrics,
) *Reactor {
r := &Reactor{
cfg: cfg,
conn: conn,
connQuery: connQuery,
metrics: metrics,
}
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)
@@ -92,6 +95,13 @@ func (r *Reactor) AddPeer(peer p2p.Peer) {
}
}
// IsSyncing returns true if state sync is actively being used to restore a state
func (r *Reactor) IsSyncing() bool {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.syncer != nil
}
// RemovePeer implements p2p.Reactor.
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
r.mtx.RLock()
@@ -265,6 +275,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.mtx.Unlock()
return sm.State{}, nil, errors.New("a state sync is already in progress")
}
r.metrics.Syncing.Add(1)
r.syncer = newSyncer(r.cfg, r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
r.mtx.Unlock()
@@ -284,6 +295,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.mtx.Lock()
r.syncer = nil
r.metrics.Syncing.Add(0)
r.mtx.Unlock()
return state, commit, err
}

View File

@@ -71,7 +71,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
// Start a reactor and send a ssproto.ChunkRequest, then wait for and check response
cfg := config.DefaultStateSyncConfig()
r := NewReactor(*cfg, conn, nil, "")
r := NewReactor(*cfg, conn, nil, "", NopMetrics())
err := r.Start()
require.NoError(t, err)
t.Cleanup(func() {
@@ -161,7 +161,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
// Start a reactor and send a SnapshotsRequestMessage, then wait for and check responses
cfg := config.DefaultStateSyncConfig()
r := NewReactor(*cfg, conn, nil, "")
r := NewReactor(*cfg, conn, nil, "", NopMetrics())
err := r.Start()
require.NoError(t, err)
t.Cleanup(func() {

View File

@@ -101,7 +101,7 @@ func (voteSet *VoteSet) ChainID() string {
// Implements VoteSetReader.
func (voteSet *VoteSet) GetHeight() int64 {
if voteSet == nil {
if voteSet.IsEmpty() {
return 0
}
return voteSet.height
@@ -109,7 +109,7 @@ func (voteSet *VoteSet) GetHeight() int64 {
// Implements VoteSetReader.
func (voteSet *VoteSet) GetRound() int32 {
if voteSet == nil {
if voteSet.IsEmpty() {
return -1
}
return voteSet.round
@@ -117,7 +117,7 @@ func (voteSet *VoteSet) GetRound() int32 {
// Implements VoteSetReader.
func (voteSet *VoteSet) Type() byte {
if voteSet == nil {
if voteSet.IsEmpty() {
return 0x00
}
return byte(voteSet.signedMsgType)
@@ -125,12 +125,16 @@ func (voteSet *VoteSet) Type() byte {
// Implements VoteSetReader.
func (voteSet *VoteSet) Size() int {
if voteSet == nil {
if voteSet.IsEmpty() {
return 0
}
return voteSet.valSet.Size()
}
func (voteSet *VoteSet) IsEmpty() bool {
return voteSet == nil || voteSet.height == 0
}
// Returns added=true if vote is valid and new.
// Otherwise returns err=ErrVote[
//