From 933256c862107d87ad78b8558b7aed14e1c296c4 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 1 Nov 2022 11:26:46 +0100 Subject: [PATCH] node start up phases --- blocksync/metrics.gen.go | 30 +++ blocksync/metrics.go | 19 ++ blocksync/pool.go | 29 +-- blocksync/reactor.go | 45 +++-- blocksync/reactor_test.go | 29 ++- consensus/byzantine_test.go | 18 +- consensus/common_test.go | 12 +- consensus/invalid_test.go | 2 +- consensus/metrics.gen.go | 14 -- consensus/metrics.go | 4 - consensus/reactor.go | 100 ++++------ consensus/reactor_test.go | 23 +-- consensus/replay_file.go | 8 +- consensus/replay_test.go | 56 +++--- consensus/state.go | 28 +-- consensus/types/height_vote_set.go | 4 +- consensus/types/height_vote_set_test.go | 26 +-- consensus/wal_generator.go | 2 +- node/node.go | 242 ++++++++++++------------ node/node_test.go | 8 +- node/setup.go | 81 +++----- p2p/switch.go | 26 ++- p2p/switch_test.go | 18 +- p2p/test_util.go | 9 +- p2p/transport.go | 36 ++-- p2p/transport_test.go | 189 +++++++++--------- rpc/core/env.go | 8 +- rpc/core/status.go | 12 +- rpc/core/types/responses.go | 4 +- state/state.go | 3 + statesync/metrics.gen.go | 30 +++ statesync/metrics.go | 19 ++ statesync/reactor.go | 12 ++ statesync/reactor_test.go | 4 +- types/vote_set.go | 12 +- 35 files changed, 644 insertions(+), 518 deletions(-) create mode 100644 blocksync/metrics.gen.go create mode 100644 blocksync/metrics.go create mode 100644 statesync/metrics.gen.go create mode 100644 statesync/metrics.go diff --git a/blocksync/metrics.gen.go b/blocksync/metrics.gen.go new file mode 100644 index 000000000..1d093fb31 --- /dev/null +++ b/blocksync/metrics.gen.go @@ -0,0 +1,30 @@ +// Code generated by metricsgen. DO NOT EDIT. + +package blocksync + +import ( + "github.com/go-kit/kit/metrics/discard" + prometheus "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "syncing", + Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.", + }, labels).With(labelsAndValues...), + } +} + +func NopMetrics() *Metrics { + return &Metrics{ + Syncing: discard.NewGauge(), + } +} diff --git a/blocksync/metrics.go b/blocksync/metrics.go new file mode 100644 index 000000000..78a6337b9 --- /dev/null +++ b/blocksync/metrics.go @@ -0,0 +1,19 @@ +package blocksync + +import ( + "github.com/go-kit/kit/metrics" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "blocksync" +) + +//go:generate go run ../scripts/metricsgen -struct=Metrics + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // Whether or not a node is block syncing. 1 if yes, 0 if no. + Syncing metrics.Gauge +} diff --git a/blocksync/pool.go b/blocksync/pool.go index 57ba94ce8..f547010f6 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -99,6 +99,9 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p // OnStart implements service.Service by spawning requesters routine and recording // pool's start time. func (pool *BlockPool) OnStart() error { + if pool.height == 0 { + return errors.New("height not set") + } go pool.makeRequestersRoutine() pool.startTime = time.Now() return nil @@ -111,22 +114,19 @@ func (pool *BlockPool) makeRequestersRoutine() { break } - _, numPending, lenRequesters := pool.GetStatus() - switch { - case numPending >= maxPendingRequests: + height, maxPeerHeight, numPending, lenRequesters := pool.GetStatus() + if height >= maxPeerHeight || + numPending >= maxPendingRequests || + lenRequesters >= maxTotalRequesters { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) // check for timed out peers pool.removeTimedoutPeers() - case lenRequesters >= maxTotalRequesters: - // sleep for a bit. - time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers - pool.removeTimedoutPeers() - default: - // request for more blocks. - pool.makeNextRequester() + continue } + + // request for more blocks. + pool.makeNextRequester() } } @@ -156,11 +156,11 @@ func (pool *BlockPool) removeTimedoutPeers() { // GetStatus returns pool's height, numPending requests and the number of // requesters. -func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) { +func (pool *BlockPool) GetStatus() (height, maxPeerHeight int64, numPending int32, lenRequesters int) { pool.mtx.Lock() defer pool.mtx.Unlock() - return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) + return pool.height, pool.maxPeerHeight, atomic.LoadInt32(&pool.numPending), len(pool.requesters) } // IsCaughtUp returns true if this node is caught up, false - otherwise. @@ -302,6 +302,7 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) { } if height > pool.maxPeerHeight { + pool.Logger.Info("new max peer height", "height", height) pool.maxPeerHeight = height } } @@ -388,7 +389,7 @@ func (pool *BlockPool) makeNextRequester() { err := request.Start() if err != nil { - request.Logger.Error("Error starting request", "err", err) + pool.Logger.Error("Error starting request", "err", err) } } diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 09dd2ef90..98fe82b0a 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -32,7 +32,7 @@ const ( type consensusReactor interface { // for when we switch from blocksync reactor and block sync to // the consensus machine - SwitchToConsensus(state sm.State, skipWAL bool) + SwitchToConsensus(state sm.State, skipWAL bool) error } type peerError struct { @@ -54,16 +54,15 @@ type Reactor struct { blockExec *sm.BlockExecutor store *store.BlockStore pool *BlockPool - blockSync bool requestsCh <-chan BlockRequest errorsCh <-chan peerError + + metrics *Metrics } // NewReactor returns new reactor instance. -func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, - blockSync bool) *Reactor { - +func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, metrics *Metrics) *Reactor { if state.LastBlockHeight != store.Height() { panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) @@ -85,9 +84,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS blockExec: blockExec, store: store, pool: pool, - blockSync: blockSync, requestsCh: requestsCh, errorsCh: errorsCh, + metrics: metrics, } bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR) return bcR @@ -101,22 +100,22 @@ func (bcR *Reactor) SetLogger(l log.Logger) { // OnStart implements service.Service. func (bcR *Reactor) OnStart() error { - if bcR.blockSync { - err := bcR.pool.Start() - if err != nil { - return err - } - go bcR.poolRoutine(false) - } return nil } +// IsSyncing returns whether the node is using blocksync to advance heights +func (bcR *Reactor) IsSyncing() bool { + return bcR.pool.IsRunning() +} + // SwitchToBlockSync is called by the state sync reactor when switching to block sync. func (bcR *Reactor) SwitchToBlockSync(state sm.State) error { - bcR.blockSync = true bcR.initialState = state - - bcR.pool.height = state.LastBlockHeight + 1 + if state.LastBlockHeight == 0 { + bcR.pool.height = state.InitialHeight + } else { + bcR.pool.height = state.LastBlockHeight + 1 + } err := bcR.pool.Start() if err != nil { return err @@ -127,7 +126,7 @@ func (bcR *Reactor) SwitchToBlockSync(state sm.State) error { // OnStop implements service.Service. func (bcR *Reactor) OnStop() { - if bcR.blockSync { + if bcR.pool.IsRunning() { if err := bcR.pool.Stop(); err != nil { bcR.Logger.Error("Error stopping pool", "err", err) } @@ -253,6 +252,7 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! func (bcR *Reactor) poolRoutine(stateSynced bool) { + bcR.metrics.Syncing.Set(1) trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) defer trySyncTicker.Stop() @@ -313,18 +313,22 @@ FOR_LOOP: for { select { case <-switchToConsensusTicker.C: - height, numPending, lenRequesters := bcR.pool.GetStatus() + height, peerHeight, numPending, lenRequesters := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters, - "outbound", outbound, "inbound", inbound) + "outbound", outbound, "inbound", inbound, "peerHeight", peerHeight) if bcR.pool.IsCaughtUp() { bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) if err := bcR.pool.Stop(); err != nil { bcR.Logger.Error("Error stopping pool", "err", err) } + + // TODO: node struct should be responsible for switching from block sync to + // consensus. It's messy to have to grab the consensus reactor from the switch conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) if ok { - conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) + err := conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) + bcR.Logger.Error("failed to switch to consensus", "err", err) } // else { // should only happen during testing @@ -426,6 +430,7 @@ FOR_LOOP: break FOR_LOOP } } + bcR.metrics.Syncing.Set(0) } // BroadcastStatusRequest broadcasts `BlockStore` base and height. diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index a88e05912..7b0b762e8 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -95,14 +95,6 @@ func newReactor( mock.Anything, mock.Anything).Return(nil) - // Make the Reactor itself. - // NOTE we have to create and commit the blocks first because - // pool.height is determined from the store. - fastSync := true - db := dbm.NewMemDB() - stateStore = sm.NewStore(db, sm.StoreOptions{ - DiscardABCIResponses: false, - }) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mp, sm.EmptyEvidencePool{}, blockStore) if err = stateStore.Save(state); err != nil { @@ -145,7 +137,7 @@ func newReactor( blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor := NewReactor(state.Copy(), blockExec, blockStore, NopMetrics()) bcReactor.SetLogger(logger.With("module", "blocksync")) return ReactorPair{bcReactor, proxyApp} @@ -156,6 +148,9 @@ func TestNoBlockResponse(t *testing.T) { defer os.RemoveAll(config.RootDir) genDoc, privVals := randGenesisDoc(1, false, 30) + state, err := sm.MakeGenesisState(genDoc) + require.NoError(t, err) + maxBlockHeight := int64(65) reactorPairs := make([]ReactorPair, 2) @@ -169,6 +164,11 @@ func TestNoBlockResponse(t *testing.T) { }, p2p.Connect2Switches) + for _, reactor := range reactorPairs { + // turn on the syncing algorithm + reactor.reactor.SwitchToBlockSync(state) + } + defer func() { for _, r := range reactorPairs { err := r.reactor.Stop() @@ -218,6 +218,9 @@ func TestBadBlockStopsPeer(t *testing.T) { defer os.RemoveAll(config.RootDir) genDoc, privVals := randGenesisDoc(1, false, 30) + state, err := sm.MakeGenesisState(genDoc) + require.NoError(t, err) + maxBlockHeight := int64(148) // Other chain needs a different validator set @@ -244,6 +247,11 @@ func TestBadBlockStopsPeer(t *testing.T) { }, p2p.Connect2Switches) + for _, reactor := range reactorPairs { + // turn on the syncing algorithm + reactor.reactor.SwitchToBlockSync(state) + } + defer func() { for _, r := range reactorPairs { err := r.reactor.Stop() @@ -287,6 +295,9 @@ func TestBadBlockStopsPeer(t *testing.T) { p2p.Connect2Switches(switches, i, len(reactorPairs)-1) } + otherState, err := sm.MakeGenesisState(otherGenDoc) + lastReactorPair.reactor.SwitchToBlockSync(otherState) + for { if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { break diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index fe0c36a14..016a5f77f 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -45,6 +45,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { appFunc := newKVStore genDoc, privVals := randGenesisDoc(nValidators, false, 30) + state, err := sm.MakeGenesisState(genDoc) + require.NoError(t, err) css := make([]*State, nValidators) for i := 0; i < nValidators; i++ { @@ -53,7 +55,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { stateStore := sm.NewStore(stateDB, sm.StoreOptions{ DiscardABCIResponses: false, }) - state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc) thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) defer os.RemoveAll(thisConfig.RootDir) ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal @@ -101,7 +102,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) + cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy())) cs.SetLogger(cs.Logger) // set private validator pv := privVals[i] @@ -124,7 +125,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { blocksSubs := make([]types.Subscription, 0) eventBuses := make([]*types.EventBus, nValidators) for i := 0; i < nValidators; i++ { - reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states + // Note, we dont start the consensus states + reactors[i] = NewReactor(css[i]) reactors[i].SetLogger(css[i].Logger) // eventBus is already started with the cs @@ -247,8 +249,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // start the consensus reactors for i := 0; i < nValidators; i++ { - s := reactors[i].conS.GetState() - reactors[i].SwitchToConsensus(s, false) + reactors[i].SwitchToConsensus(state.Copy(), false) } defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) @@ -307,7 +308,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { N := 4 logger := consensusLogger().With("test", "byzantine") app := newKVStore - css, cleanup := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), app) + css, cleanup := randConsensusNet(t, N, "consensus_byzantine_test", newMockTickerFunc(false), app) defer cleanup() // give the byzantine validator a normal ticker @@ -356,7 +357,8 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) require.NoError(t, err) - conR := NewReactor(css[i], true) // so we don't start the consensus states + // Note, we don't start the consensus states + conR := NewReactor(css[i]) conR.SetLogger(logger.With("validator", i)) conR.SetEventBus(eventBus) @@ -572,7 +574,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) { // Send our state to peer. // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). - if !br.reactor.waitSync { + if br.reactor.conS.IsRunning() { br.reactor.sendNewRoundStepMessage(peer) } } diff --git a/consensus/common_test.go b/consensus/common_test.go index fee218198..54d7b9c61 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -440,7 +440,7 @@ func newStateWithConfigAndBlockStore( } blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) + cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy())) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) @@ -747,18 +747,17 @@ func consensusLogger() log.Logger { }).With("module", "consensus") } -func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker, +func randConsensusNet(t *testing.T, nValidators int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application, configOpts ...func(*cfg.Config)) ([]*State, cleanupFunc) { + t.Helper() genDoc, privVals := randGenesisDoc(nValidators, false, 30) + state, err := sm.MakeGenesisState(genDoc) + require.NoError(t, err) css := make([]*State, nValidators) logger := consensusLogger() configRootDirs := make([]string, 0, nValidators) for i := 0; i < nValidators; i++ { stateDB := dbm.NewMemDB() // each state needs its own db - stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardABCIResponses: false, - }) - state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc) thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) configRootDirs = append(configRootDirs, thisConfig.RootDir) for _, opt := range configOpts { @@ -772,6 +771,7 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB) css[i].SetTimeoutTicker(tickerFunc()) css[i].SetLogger(logger.With("validator", i, "module", "consensus")) + css[i].updateToState(state.Copy()) } return css, func() { for _, dir := range configRootDirs { diff --git a/consensus/invalid_test.go b/consensus/invalid_test.go index f96018157..fa70ff468 100644 --- a/consensus/invalid_test.go +++ b/consensus/invalid_test.go @@ -18,7 +18,7 @@ import ( // Ensure a testnet makes blocks func TestReactorInvalidPrecommit(t *testing.T) { N := 4 - css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) + css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) defer cleanup() for i := 0; i < 4; i++ { diff --git a/consensus/metrics.gen.go b/consensus/metrics.gen.go index 6f1699cdd..94ea5d224 100644 --- a/consensus/metrics.gen.go +++ b/consensus/metrics.gen.go @@ -118,18 +118,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "latest_block_height", Help: "The latest block height.", }, labels).With(labelsAndValues...), - BlockSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "block_syncing", - Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.", - }, labels).With(labelsAndValues...), - StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "state_syncing", - Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.", - }, labels).With(labelsAndValues...), BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -208,8 +196,6 @@ func NopMetrics() *Metrics { BlockSizeBytes: discard.NewGauge(), TotalTxs: discard.NewGauge(), CommittedHeight: discard.NewGauge(), - BlockSyncing: discard.NewGauge(), - StateSyncing: discard.NewGauge(), BlockParts: discard.NewCounter(), StepDurationSeconds: discard.NewHistogram(), BlockGossipPartsReceived: discard.NewCounter(), diff --git a/consensus/metrics.go b/consensus/metrics.go index e6a8f284a..f8262d391 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -61,10 +61,6 @@ type Metrics struct { TotalTxs metrics.Gauge // The latest block height. CommittedHeight metrics.Gauge `metrics_name:"latest_block_height"` - // Whether or not a node is block syncing. 1 if yes, 0 if no. - BlockSyncing metrics.Gauge - // Whether or not a node is state syncing. 1 if yes, 0 if no. - StateSyncing metrics.Gauge // Number of block parts transmitted by each peer. BlockParts metrics.Counter `metrics_labels:"peer_id"` diff --git a/consensus/reactor.go b/consensus/reactor.go index b0d3e3675..1410f2158 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -44,7 +44,6 @@ type Reactor struct { conS *State mtx tmsync.RWMutex - waitSync bool eventBus *types.EventBus rs *cstypes.RoundState @@ -55,12 +54,11 @@ type ReactorOption func(*Reactor) // NewReactor returns a new Reactor with the given // consensusState. -func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor { +func NewReactor(consensusState *State, options ...ReactorOption) *Reactor { conR := &Reactor{ - conS: consensusState, - waitSync: waitSync, - rs: consensusState.GetRoundState(), - Metrics: NopMetrics(), + conS: consensusState, + rs: consensusState.GetRoundState(), + Metrics: NopMetrics(), } conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) @@ -74,21 +72,12 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) // OnStart implements BaseService by subscribing to events, which later will be // broadcasted to other peers and starting state if we're not in block sync. func (conR *Reactor) OnStart() error { - conR.Logger.Info("Reactor ", "waitSync", conR.WaitSync()) - // start routine that computes peer statistics for evaluating peer quality go conR.peerStatsRoutine() conR.subscribeToBroadcastEvents() go conR.updateRoundStateRoutine() - if !conR.WaitSync() { - err := conR.conS.Start() - if err != nil { - return err - } - } - return nil } @@ -96,47 +85,34 @@ func (conR *Reactor) OnStart() error { // state. func (conR *Reactor) OnStop() { conR.unsubscribeFromBroadcastEvents() - if err := conR.conS.Stop(); err != nil { - conR.Logger.Error("Error stopping consensus state", "err", err) - } - if !conR.WaitSync() { + if conR.conS.IsRunning() { + if err := conR.conS.Stop(); err != nil { + conR.Logger.Error("Error stopping consensus state", "err", err) + } conR.conS.Wait() } } +func (conR *Reactor) IsConsensusRunning() bool { + return conR.conS.IsRunning() +} + // SwitchToConsensus switches from block_sync mode to consensus mode. // It resets the state, turns off block_sync, and starts the consensus state-machine -func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) { +func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) error { conR.Logger.Info("SwitchToConsensus") // We have no votes, so reconstruct LastCommit from SeenCommit. - if state.LastBlockHeight > 0 { + if state.LastBlockHeight > state.InitialHeight { conR.conS.reconstructLastCommit(state) } - // NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a - // NewRoundStepMessage. conR.conS.updateToState(state) - conR.mtx.Lock() - conR.waitSync = false - conR.mtx.Unlock() - conR.Metrics.BlockSyncing.Set(0) - conR.Metrics.StateSyncing.Set(0) - if skipWAL { conR.conS.doWALCatchup = false } - err := conR.conS.Start() - if err != nil { - panic(fmt.Sprintf(`Failed to start consensus state: %v - -conS: -%+v - -conR: -%+v`, err, conR.conS, conR)) - } + return conR.conS.Start() } // GetChannels implements Reactor @@ -199,7 +175,7 @@ func (conR *Reactor) AddPeer(peer p2p.Peer) { // Send our state to peer. // If we're block_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). - if !conR.WaitSync() { + if conR.conS.IsRunning() { conR.sendNewRoundStepMessage(peer) } } @@ -304,7 +280,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { } case DataChannel: - if conR.WaitSync() { + if !conR.conS.IsRunning() { conR.Logger.Info("Ignoring message received during sync", "msg", msg) return } @@ -323,7 +299,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { } case VoteChannel: - if conR.WaitSync() { + if !conR.conS.IsRunning() { conR.Logger.Info("Ignoring message received during sync", "msg", msg) return } @@ -345,7 +321,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { } case VoteSetBitsChannel: - if conR.WaitSync() { + if !conR.conS.IsRunning() { conR.Logger.Info("Ignoring message received during sync", "msg", msg) return } @@ -386,13 +362,6 @@ func (conR *Reactor) SetEventBus(b *types.EventBus) { conR.conS.SetEventBus(b) } -// WaitSync returns whether the consensus reactor is waiting for state/block sync. -func (conR *Reactor) WaitSync() bool { - conR.mtx.RLock() - defer conR.mtx.RUnlock() - return conR.waitSync -} - //-------------------------------------- // subscribeToBroadcastEvents subscribes for new round steps and votes @@ -519,6 +488,11 @@ OUTER_LOOP: if !peer.IsRunning() || !conR.IsRunning() { return } + if !conR.IsConsensusRunning() { + time.Sleep(conR.conS.config.PeerGossipSleepDuration) + continue OUTER_LOOP + } + rs := conR.getRoundState() prs := ps.GetRoundState() @@ -646,7 +620,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt } return } - // logger.Info("No parts to send in catch-up, sleeping") + time.Sleep(conR.conS.config.PeerGossipSleepDuration) } @@ -656,12 +630,16 @@ func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) { // Simple hack to throttle logs upon sleep. var sleeping = 0 -OUTER_LOOP: for { // Manage disconnects from self or peer. if !peer.IsRunning() || !conR.IsRunning() { return } + if !conR.IsConsensusRunning() { + time.Sleep(conR.conS.config.PeerGossipSleepDuration) + continue + } + rs := conR.getRoundState() prs := ps.GetRoundState() @@ -672,14 +650,11 @@ OUTER_LOOP: sleeping = 0 } - // logger.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round, - // "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step) - // If height matches, then send LastCommit, Prevotes, Precommits. if rs.Height == prs.Height { heightLogger := logger.With("height", prs.Height) if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) { - continue OUTER_LOOP + continue } } @@ -688,7 +663,7 @@ OUTER_LOOP: if prs.Height != 0 && rs.Height == prs.Height+1 { if ps.PickSendVote(rs.LastCommit) { logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) - continue OUTER_LOOP + continue } } @@ -701,7 +676,7 @@ OUTER_LOOP: if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil { if ps.PickSendVote(commit) { logger.Debug("Picked Catchup commit to send", "height", prs.Height) - continue OUTER_LOOP + continue } } } @@ -718,7 +693,7 @@ OUTER_LOOP: } time.Sleep(conR.conS.config.PeerGossipSleepDuration) - continue OUTER_LOOP + continue } } @@ -792,6 +767,11 @@ OUTER_LOOP: return } + if !conR.IsConsensusRunning() { + time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) + continue OUTER_LOOP + } + // Maybe send Height/Round/Prevotes { rs := conR.getRoundState() @@ -863,8 +843,6 @@ OUTER_LOOP: } } - time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) - continue OUTER_LOOP } } diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 303f5e6e2..e4e140bd3 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -54,9 +54,8 @@ func startConsensusNet(t *testing.T, css []*State, n int) ( blocksSubs := make([]types.Subscription, 0) eventBuses := make([]*types.EventBus, n) for i := 0; i < n; i++ { - /*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info") - if err != nil { t.Fatal(err)}*/ - reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states + // Note, we dont start the consensus states + reactors[i] = NewReactor(css[i]) reactors[i].SetLogger(css[i].Logger) // eventBus is already started with the cs @@ -112,7 +111,7 @@ func stopConsensusNet(logger log.Logger, reactors []*Reactor, eventBuses []*type // Ensure a testnet makes blocks func TestReactorBasic(t *testing.T) { N := 4 - css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) + css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) defer cleanup() reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) @@ -134,6 +133,8 @@ func TestReactorWithEvidence(t *testing.T) { // css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) genDoc, privVals := randGenesisDoc(nValidators, false, 30) + state, err := sm.MakeGenesisState(genDoc) + require.NoError(t, err) css := make([]*State, nValidators) logger := consensusLogger() for i := 0; i < nValidators; i++ { @@ -141,7 +142,6 @@ func TestReactorWithEvidence(t *testing.T) { stateStore := sm.NewStore(stateDB, sm.StoreOptions{ DiscardABCIResponses: false, }) - state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc) thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) defer os.RemoveAll(thisConfig.RootDir) ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal @@ -202,7 +202,7 @@ func TestReactorWithEvidence(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) + cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool2, WithState(state.Copy())) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) @@ -236,7 +236,7 @@ func TestReactorWithEvidence(t *testing.T) { // Ensure a testnet makes blocks when there are txs func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { N := 4 - css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore, + css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore, func(c *cfg.Config) { c.Consensus.CreateEmptyBlocks = false }) @@ -257,7 +257,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) { N := 1 - css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) + css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) defer cleanup() reactors, _, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) @@ -280,7 +280,7 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) { func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) { N := 1 - css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) + css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) defer cleanup() reactors, _, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) @@ -303,7 +303,7 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) { // Test we record stats about votes and block parts from other peers. func TestReactorRecordsVotesAndBlockParts(t *testing.T) { N := 4 - css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) + css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore) defer cleanup() reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) @@ -329,6 +329,7 @@ func TestReactorVotingPowerChange(t *testing.T) { nVals := 4 logger := log.TestingLogger() css, cleanup := randConsensusNet( + t, nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), @@ -524,7 +525,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { // Check we can make blocks with skip_timeout_commit=false func TestReactorWithTimeoutCommit(t *testing.T) { N := 4 - css, cleanup := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newKVStore) + css, cleanup := randConsensusNet(t, N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newKVStore) defer cleanup() // override default SkipTimeoutCommit == true for tests for i := 0; i < N; i++ { diff --git a/consensus/replay_file.go b/consensus/replay_file.go index c342c32bd..31ec4d521 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -129,8 +129,8 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error } pb.cs.Wait() - newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, - pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool) + newCS := NewState(pb.cs.config, pb.cs.blockExec, + pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, WithState(pb.genesisState.Copy())) newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -332,8 +332,8 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(csConfig, state.Copy(), blockExec, - blockStore, mempool, evpool) + consensusState := NewState(csConfig, blockExec, + blockStore, mempool, evpool, WithState(state.Copy())) consensusState.SetEventBus(eventBus) return consensusState diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 44bbe09bf..70b7edeb4 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -67,7 +67,8 @@ func TestMain(m *testing.M) { func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config, lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) { logger := log.TestingLogger() - state, _ := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile()) + state, err := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile()) + require.NoError(t, err) privValidator := loadPrivValidator(consensusReplayConfig) cs := newStateWithConfigAndBlockStore( consensusReplayConfig, @@ -81,7 +82,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi bytes, _ := os.ReadFile(cs.config.WalFile()) t.Logf("====== WAL: \n\r%X\n", bytes) - err := cs.Start() + err = cs.Start() require.NoError(t, err) defer func() { if err := cs.Stop(); err != nil { @@ -555,40 +556,40 @@ func TestSimulateValidatorsChange(t *testing.T) { // Sync from scratch func TestHandshakeReplayAll(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, 0, m, false) + testHandshakeReplay(t, 0, m, false) } for _, m := range modes { - testHandshakeReplay(t, config, 0, m, true) + testHandshakeReplay(t, 0, m, true) } } // Sync many, not from scratch func TestHandshakeReplaySome(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, 2, m, false) + testHandshakeReplay(t, 2, m, false) } for _, m := range modes { - testHandshakeReplay(t, config, 2, m, true) + testHandshakeReplay(t, 2, m, true) } } // Sync from lagging by one func TestHandshakeReplayOne(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, numBlocks-1, m, false) + testHandshakeReplay(t, numBlocks-1, m, false) } for _, m := range modes { - testHandshakeReplay(t, config, numBlocks-1, m, true) + testHandshakeReplay(t, numBlocks-1, m, true) } } // Sync from caught up func TestHandshakeReplayNone(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, numBlocks, m, false) + testHandshakeReplay(t, numBlocks, m, false) } for _, m := range modes { - testHandshakeReplay(t, config, numBlocks, m, true) + testHandshakeReplay(t, numBlocks, m, true) } } @@ -660,25 +661,27 @@ func tempWALWithData(data []byte) string { // Make some blocks. Start a fresh app and apply nBlocks blocks. // Then restart the app and sync it up with the remaining blocks -func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint, testValidatorsChange bool) { - var chain []*types.Block - var commits []*types.Commit - var store *mockBlockStore - var stateDB dbm.DB - var genesisState sm.State +func testHandshakeReplay(t *testing.T, nBlocks int, mode uint, testValidatorsChange bool) { + var ( + chain []*types.Block + commits []*types.Commit + store *mockBlockStore + stateDB dbm.DB + genesisState sm.State + config *cfg.Config + ) if testValidatorsChange { - testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode)) - defer os.RemoveAll(testConfig.RootDir) + config = ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode)) + defer os.RemoveAll(config.RootDir) stateDB = dbm.NewMemDB() genesisState = sim.GenesisState - config = sim.Config chain = append([]*types.Block{}, sim.Chain...) // copy chain commits = sim.Commits store = newMockBlockStore(t, config, genesisState.ConsensusParams) } else { // test single node - testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode)) - defer os.RemoveAll(testConfig.RootDir) + config = ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode)) + defer os.RemoveAll(config.RootDir) walBody, err := WALWithNBlocks(t, numBlocks) require.NoError(t, err) walFile := tempWALWithData(walBody) @@ -811,14 +814,11 @@ func buildAppStateFromChain(t *testing.T, proxyApp proxy.AppConns, stateStore sm state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version validators := types.TM2PB.ValidatorUpdates(state.Validators) - if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{ + _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{ Validators: validators, - }); err != nil { - panic(err) - } - if err := stateStore.Save(state); err != nil { // save height 1's validatorsInfo - panic(err) - } + }); + require.NoError(t, err) + require.NoError(t, stateStore.Save(state)) switch mode { case 0: for i := 0; i < nBlocks; i++ { diff --git a/consensus/state.go b/consensus/state.go index b1b64d7ef..929ec575d 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -148,7 +148,6 @@ type StateOption func(*State) // NewState returns a new State. func NewState( config *cfg.ConsensusConfig, - state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, txNotifier txNotifier, @@ -177,13 +176,6 @@ func NewState( cs.doPrevote = cs.defaultDoPrevote cs.setProposal = cs.defaultSetProposal - // We have no votes, so reconstruct LastCommit from SeenCommit. - if state.LastBlockHeight > 0 { - cs.reconstructLastCommit(state) - } - - cs.updateToState(state) - // NOTE: we do not call scheduleRound0 yet, we do that upon Start() cs.BaseService = *service.NewBaseService(nil, "State", cs) @@ -207,10 +199,19 @@ func (cs *State) SetEventBus(b *types.EventBus) { } // StateMetrics sets the metrics. -func StateMetrics(metrics *Metrics) StateOption { +func WithMetrics(metrics *Metrics) StateOption { return func(cs *State) { cs.metrics = metrics } } +func WithState(state sm.State) StateOption { + return func(cs *State) { + if state.LastBlockHeight > 0 { + cs.reconstructLastCommit(state) + } + cs.updateToState(state) + } +} + // String returns a string. func (cs *State) String() string { // better not to access shared variables @@ -297,6 +298,10 @@ func (cs *State) LoadCommit(height int64) *types.Commit { // OnStart loads the latest state via the WAL, and starts the timeout and // receive routines. func (cs *State) OnStart() error { + if cs.state.IsEmpty() { + return errors.New("no state to commence consensus on") + } + // We may set the WAL in testing before calling Start, so only OpenWAL if its // still the nilWAL. if _, ok := cs.wal.(nilWAL); ok { @@ -612,7 +617,7 @@ func (cs *State) updateToState(state sm.State) { // signal the new round step, because other services (eg. txNotifier) // depend on having an up-to-date peer state! if state.LastBlockHeight <= cs.state.LastBlockHeight { - cs.Logger.Debug( + cs.Logger.Info( "ignoring updateToState()", "new_height", state.LastBlockHeight+1, "old_height", cs.state.LastBlockHeight+1, @@ -2275,11 +2280,10 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header return nil } - // TODO: pass pubKey to signVote vote, err := cs.signVote(msgType, hash, header) if err == nil { cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""}) - cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote) + cs.Logger.Info("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote) return vote } diff --git a/consensus/types/height_vote_set.go b/consensus/types/height_vote_set.go index 6a5c0b495..29e64e319 100644 --- a/consensus/types/height_vote_set.go +++ b/consensus/types/height_vote_set.go @@ -121,7 +121,7 @@ func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, return } voteSet := hvs.getVoteSet(vote.Round, vote.Type) - if voteSet == nil { + if voteSet.IsEmpty() { if rndz := hvs.peerCatchupRounds[peerID]; len(rndz) < 2 { hvs.addRound(vote.Round) voteSet = hvs.getVoteSet(vote.Round, vote.Type) @@ -166,7 +166,7 @@ func (hvs *HeightVoteSet) POLInfo() (polRound int32, polBlockID types.BlockID) { func (hvs *HeightVoteSet) getVoteSet(round int32, voteType tmproto.SignedMsgType) *types.VoteSet { rvs, ok := hvs.roundVoteSets[round] if !ok { - return nil + return &types.VoteSet{} } switch voteType { case tmproto.PrevoteType: diff --git a/consensus/types/height_vote_set_test.go b/consensus/types/height_vote_set_test.go index ccbdbed9d..56de28c4f 100644 --- a/consensus/types/height_vote_set_test.go +++ b/consensus/types/height_vote_set_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + "github.com/stretchr/testify/require" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/internal/test" @@ -30,30 +31,23 @@ func TestPeerCatchupRounds(t *testing.T) { vote999_0 := makeVoteHR(t, 1, 0, 999, privVals) added, err := hvs.AddVote(vote999_0, "peer1") - if !added || err != nil { - t.Error("Expected to successfully add vote from peer", added, err) - } + require.NoError(t, err) + require.True(t, added) vote1000_0 := makeVoteHR(t, 1, 0, 1000, privVals) added, err = hvs.AddVote(vote1000_0, "peer1") - if !added || err != nil { - t.Error("Expected to successfully add vote from peer", added, err) - } + require.NoError(t, err) + require.True(t, added) vote1001_0 := makeVoteHR(t, 1, 0, 1001, privVals) added, err = hvs.AddVote(vote1001_0, "peer1") - if err != ErrGotVoteFromUnwantedRound { - t.Errorf("expected GotVoteFromUnwantedRoundError, but got %v", err) - } - if added { - t.Error("Expected to *not* add vote from peer, too many catchup rounds.") - } + require.Error(t, err) + require.Equal(t, ErrGotVoteFromUnwantedRound, err) + require.False(t, added) added, err = hvs.AddVote(vote1001_0, "peer2") - if !added || err != nil { - t.Error("Expected to successfully add vote from another peer") - } - + require.NoError(t, err) + require.True(t, added) } func makeVoteHR(t *testing.T, height int64, valIndex, round int32, privVals []types.PrivValidator) *types.Vote { diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 96a0d485c..f58ee7843 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -86,7 +86,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) + consensusState := NewState(config.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy())) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) if privValidator != nil { diff --git a/node/node.go b/node/node.go index f84cfc243..644fcd638 100644 --- a/node/node.go +++ b/node/node.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" + "github.com/tendermint/tendermint/blocksync" bc "github.com/tendermint/tendermint/blocksync" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" @@ -58,11 +59,10 @@ type Node struct { // services eventBus *types.EventBus // pub/sub for services stateStore sm.Store - blockStore *store.BlockStore // store the blockchain to disk - bcReactor p2p.Reactor // for block-syncing - mempoolReactor p2p.Reactor // for gossipping transactions + blockStore *store.BlockStore // store the blockchain to disk + bcReactor *blocksync.Reactor // for block-syncing + mempoolReactor p2p.Reactor // for gossipping transactions mempool mempl.Mempool - stateSync bool // whether the node should state sync on startup stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node stateSyncGenesis sm.State // provides the genesis state for state sync @@ -77,6 +77,7 @@ type Node struct { indexerService *txindex.IndexerService prometheusSrv *http.Server pprofSrv *http.Server + customReactors map[string]p2p.Reactor } // Option sets a parameter for the node. @@ -96,29 +97,7 @@ type Option func(*Node) // - STATESYNC func CustomReactors(reactors map[string]p2p.Reactor) Option { return func(n *Node) { - for name, reactor := range reactors { - if existingReactor := n.sw.Reactor(name); existingReactor != nil { - n.sw.Logger.Info("Replacing existing reactor with a custom one", - "name", name, "existing", existingReactor, "custom", reactor) - n.sw.RemoveReactor(name, existingReactor) - } - n.sw.AddReactor(name, reactor) - // register the new channels to the nodeInfo - // NOTE: This is a bit messy now with the type casting but is - // cleaned up in the following version when NodeInfo is changed from - // and interface to a concrete type - if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok { - for _, chDesc := range reactor.GetChannels() { - if !ni.HasChannel(chDesc.ID) { - ni.Channels = append(ni.Channels, chDesc.ID) - n.transport.AddChannel(chDesc.ID) - } - } - n.nodeInfo = ni - } else { - n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.") - } - } + n.customReactors = reactors } } @@ -158,24 +137,21 @@ func NewNode(config *cfg.Config, return nil, err } - csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID) + csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics, bsMetrics, ssMetrics := metricsProvider(genDoc.ChainID) // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). - proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics) + proxyApp, err := createProxyAppConns(clientCreator, logger, abciMetrics) if err != nil { - return nil, err + return nil, fmt.Errorf("error starting proxy app connections: %v", err) } // EventBus and IndexerService must be started before the handshake because // we might need to index the txs of the replayed block as this might not have happened // when the node stopped last time (i.e. the node stopped after it saved the block // but before it indexed the txs, or, endblocker panicked) - eventBus, err := createAndStartEventBus(logger) - if err != nil { - return nil, err - } + eventBus := createEventBus(logger) - indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, + indexerService, txIndexer, blockIndexer, err := createIndexerService(config, genDoc.ChainID, dbProvider, eventBus, logger) if err != nil { return nil, err @@ -185,47 +161,12 @@ func NewNode(config *cfg.Config, // external signing process. if config.PrivValidatorListenAddr != "" { // FIXME: we should start services inside OnStart - privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger) + privValidator, err = createPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger) if err != nil { return nil, fmt.Errorf("error with private validator socket client: %w", err) } } - pubKey, err := privValidator.GetPubKey() - if err != nil { - return nil, fmt.Errorf("can't get pubkey: %w", err) - } - - // Determine whether we should attempt state sync. - stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey) - if stateSync && state.LastBlockHeight > 0 { - logger.Info("Found local state with non-zero height, skipping state sync") - stateSync = false - } - - // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, - // and replays any blocks as necessary to sync tendermint with the app. - consensusLogger := logger.With("module", "consensus") - if !stateSync { - if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { - return nil, err - } - - // Reload the state. It will have the Version.Consensus.App set by the - // Handshake, and may have other modifications as well (ie. depending on - // what happened during block replay). - state, err = stateStore.Load() - if err != nil { - return nil, fmt.Errorf("cannot load state: %w", err) - } - } - - // Determine whether we should do block sync. This must happen after the handshake, since the - // app may modify the validator set, specifying ourself as the only validator. - blockSync := config.BlockSyncMode && !onlyValidatorIsUs(state, pubKey) - - logNodeStartupInfo(state, pubKey, logger, consensusLogger) - // Make MempoolReactor mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) @@ -247,21 +188,16 @@ func NewNode(config *cfg.Config, ) // Make BlocksyncReactor. Don't start block sync if we're doing a state sync first. - bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger) + bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, bsMetrics, logger) if err != nil { return nil, fmt.Errorf("could not create blocksync reactor: %w", err) } // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. - // FIXME We need to update metrics here, since other reactors don't have access to them. - if stateSync { - csMetrics.StateSyncing.Set(1) - } else if blockSync { - csMetrics.BlockSyncing.Set(1) - } + consensusLogger := logger.With("module", "consensus") consensusReactor, consensusState := createConsensusReactor( - config, state, blockExec, blockStore, mempool, evidencePool, - privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger, + config, blockExec, blockStore, mempool, evidencePool, + privValidator, csMetrics, eventBus, consensusLogger, ) // Set up state sync reactor, and schedule a sync if requested. @@ -273,22 +209,15 @@ func NewNode(config *cfg.Config, proxyApp.Snapshot(), proxyApp.Query(), config.StateSync.TempDir, + ssMetrics, ) stateSyncReactor.SetLogger(logger.With("module", "statesync")) - nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) - if err != nil { - return nil, err - } - - // Setup Transport. - transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp) - // Setup Switch. p2pLogger := logger.With("module", "p2p") sw := createSwitch( - config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, - stateSyncReactor, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, + config, p2pMetrics, mempoolReactor, bcReactor, + stateSyncReactor, consensusReactor, evidenceReactor, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) @@ -301,7 +230,7 @@ func NewNode(config *cfg.Config, return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) } - addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) + addrBook, err := createAddrBook(config, p2pLogger, nodeKey) if err != nil { return nil, fmt.Errorf("could not create addrbook: %w", err) } @@ -320,7 +249,8 @@ func NewNode(config *cfg.Config, // Note we currently use the addrBook regardless at least for AddOurAddress var pexReactor *pex.Reactor if config.P2P.PexReactor { - pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + pexReactor = createPEXReactor(addrBook, config, logger) + sw.AddReactor("PEX", pexReactor) } // Add private IDs to addrbook to block those peers being added @@ -331,11 +261,9 @@ func NewNode(config *cfg.Config, genesisDoc: genDoc, privValidator: privValidator, - transport: transport, - sw: sw, - addrBook: addrBook, - nodeInfo: nodeInfo, - nodeKey: nodeKey, + sw: sw, + addrBook: addrBook, + nodeKey: nodeKey, stateStore: stateStore, blockStore: blockStore, @@ -345,7 +273,6 @@ func NewNode(config *cfg.Config, consensusState: consensusState, consensusReactor: consensusReactor, stateSyncReactor: stateSyncReactor, - stateSync: stateSync, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state pexReactor: pexReactor, evidencePool: evidencePool, @@ -361,6 +288,15 @@ func NewNode(config *cfg.Config, option(node) } + for name, reactor := range node.customReactors { + if existingReactor := node.sw.Reactor(name); existingReactor != nil { + node.sw.Logger.Info("Replacing existing reactor with a custom one", + "name", name, "existing", existingReactor, "custom", reactor) + node.sw.RemoveReactor(name, existingReactor) + } + node.sw.AddReactor(name, reactor) + } + return node, nil } @@ -373,6 +309,67 @@ func (n *Node) OnStart() error { time.Sleep(genTime.Sub(now)) } + pubKey, err := n.privValidator.GetPubKey() + if err != nil { + return fmt.Errorf("can't get pubkey: %w", err) + } + + state, err := n.stateStore.LoadFromDBOrGenesisDoc(n.genesisDoc) + if err != nil { + return fmt.Errorf("cannot load state: %w", err) + } + + // Determine whether we should attempt state sync. + stateSync := n.config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey) + if stateSync && state.LastBlockHeight > 0 { + n.Logger.Info("Found local state with non-zero height, skipping state sync") + stateSync = false + } + + // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, + // and replays any blocks as necessary to sync tendermint with the app. + + if !stateSync { + if err := doHandshake(n.stateStore, state, n.blockStore, n.genesisDoc, n.eventBus, n.proxyApp, n.Logger); err != nil { + return err + } + + // Reload the state. It will have the Version.Consensus.App set by the + // Handshake, and may have other modifications as well (ie. depending on + // what happened during block replay). + state, err = n.stateStore.Load() + if err != nil { + return fmt.Errorf("cannot load state: %w", err) + } + } + + nodeInfo, err := makeNodeInfo(n.config, n.nodeKey, n.txIndexer, n.genesisDoc, state) + if err != nil { + return err + } + + for _, reactor := range n.customReactors { + for _, chDesc := range reactor.GetChannels() { + if !nodeInfo.HasChannel(chDesc.ID) { + nodeInfo.Channels = append(nodeInfo.Channels, chDesc.ID) + } + } + } + n.nodeInfo = nodeInfo + + addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress)) + if err != nil { + return err + } + + // Setup Transport. + transport, peerFilters := createTransport(n.config, n.nodeInfo, n.nodeKey, addr, n.proxyApp) + + n.sw.SetTransport(transport) + n.sw.SetPeerFilters(peerFilters...) + n.sw.SetNodeInfo(n.nodeInfo) + n.sw.SetNodeKey(n.nodeKey) + // run pprof server if it is enabled if n.config.RPC.IsPprofEnabled() { n.pprofSrv = n.startPprofServer() @@ -393,16 +390,13 @@ func (n *Node) OnStart() error { n.rpcListeners = listeners } - // Start the transport. - addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress)) - if err != nil { - return err - } - if err := n.transport.Listen(*addr); err != nil { + if err := n.eventBus.Start(); err != nil { return err } - n.isListening = true + if err := n.indexerService.Start(); err != nil { + return err + } // Start the switch (the P2P server). err = n.sw.Start() @@ -410,23 +404,37 @@ func (n *Node) OnStart() error { return err } + n.isListening = true + // Always connect to persistent peers err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) if err != nil { return fmt.Errorf("could not dial peers from persistent_peers field: %w", err) } - // Run state sync - if n.stateSync { - bcR, ok := n.bcReactor.(blockSyncReactor) - if !ok { - return fmt.Errorf("this blocksync reactor does not support switching from state sync") - } - err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider, + // Determine whether we should do block sync. This must happen after the handshake, since the + // app may modify the validator set, specifying ourself as the only validator. + blockSync := n.config.BlockSyncMode && !onlyValidatorIsUs(state, pubKey) + + logNodeStartupInfo(state, pubKey, n.Logger) + + // Run start up phases + if stateSync { + err := startStateSync(n.stateSyncReactor, n.bcReactor, n.consensusReactor, n.stateSyncProvider, n.config.StateSync, n.config.BlockSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis) if err != nil { return fmt.Errorf("failed to start state sync: %w", err) } + } else if blockSync { + err := n.bcReactor.SwitchToBlockSync(state) + if err != nil { + return fmt.Errorf("failed to start block sync: %w", err) + } + } else { + err := n.consensusReactor.SwitchToConsensus(state, false) + if err != nil { + return fmt.Errorf("failed to switch to consensus: %w", err) + } } return nil @@ -451,10 +459,6 @@ func (n *Node) OnStop() { n.Logger.Error("Error closing switch", "err", err) } - if err := n.transport.Close(); err != nil { - n.Logger.Error("Error closing transport", "err", err) - } - n.isListening = false // finally stop the listeners / external services @@ -516,6 +520,8 @@ func (n *Node) ConfigureRPC() error { TxIndexer: n.txIndexer, BlockIndexer: n.blockIndexer, ConsensusReactor: n.consensusReactor, + BlocksyncReactor: n.bcReactor, + StatesyncReactor: n.stateSyncReactor, EventBus: n.eventBus, Mempool: n.mempool, @@ -649,7 +655,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { } // startPrometheusServer starts a Prometheus HTTP server, listening for metrics -// collectors on addr. +// collectors on the provided address. func (n *Node) startPrometheusServer() *http.Server { srv := &http.Server{ Addr: n.config.Instrumentation.PrometheusListenAddr, @@ -670,7 +676,7 @@ func (n *Node) startPrometheusServer() *http.Server { return srv } -// starts a ppro +// starts a pprof server at the specified listen address func (n *Node) startPprofServer() *http.Server { srv := &http.Server{ Addr: n.config.RPC.PprofListenAddress, diff --git a/node/node_test.go b/node/node_test.go index ee23892b1..cc1de540d 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -122,6 +122,8 @@ func TestNodeSetAppVersion(t *testing.T) { // create & start node n, err := DefaultNewNode(config, log.TestingLogger()) require.NoError(t, err) + require.NoError(t, n.Start()) + defer n.Stop() // default config uses the kvstore app var appVersion = kvstore.ProtocolVersion @@ -427,7 +429,6 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { RecvMessageCapacity: 100, }, } - customBlocksyncReactor := p2pmock.NewReactor() nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) require.NoError(t, err) @@ -440,7 +441,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { DefaultDBProvider, DefaultMetricsProvider(config.Instrumentation), log.TestingLogger(), - CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKSYNC": customBlocksyncReactor}), + CustomReactors(map[string]p2p.Reactor{"FOO": cr}), ) require.NoError(t, err) @@ -451,9 +452,6 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { assert.True(t, cr.IsRunning()) assert.Equal(t, cr, n.Switch().Reactor("FOO")) - assert.True(t, customBlocksyncReactor.IsRunning()) - assert.Equal(t, customBlocksyncReactor, n.Switch().Reactor("BLOCKSYNC")) - channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels assert.Contains(t, channels, mempl.MempoolChannel) assert.Contains(t, channels, cr.Channels[0].ID) diff --git a/node/setup.go b/node/setup.go index e9449558e..6c74ffb45 100644 --- a/node/setup.go +++ b/node/setup.go @@ -12,7 +12,7 @@ import ( dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" - bc "github.com/tendermint/tendermint/blocksync" + "github.com/tendermint/tendermint/blocksync" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" @@ -99,20 +99,22 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { } // MetricsProvider returns a consensus, p2p and mempool Metrics. -type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) +type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) // DefaultMetricsProvider returns Metrics build using Prometheus client library // if Prometheus is enabled. Otherwise, it returns no-op Metrics. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { - return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) { + return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) { if config.Prometheus { return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID), sm.PrometheusMetrics(config.Namespace, "chain_id", chainID), - proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID) + proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID), + blocksync.PrometheusMetrics(config.Namespace, "chain_id", chainID), + statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID) } - return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics() + return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics(), blocksync.NopMetrics(), statesync.NopMetrics() } } @@ -138,7 +140,7 @@ func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.Block return } -func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) { +func createProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) { proxyApp := proxy.NewAppConns(clientCreator, metrics) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { @@ -147,16 +149,13 @@ func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.L return proxyApp, nil } -func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { +func createEventBus(logger log.Logger) *types.EventBus { eventBus := types.NewEventBus() eventBus.SetLogger(logger.With("module", "events")) - if err := eventBus.Start(); err != nil { - return nil, err - } - return eventBus, nil + return eventBus } -func createAndStartIndexerService( +func createIndexerService( config *cfg.Config, chainID string, dbProvider DBProvider, @@ -197,10 +196,6 @@ func createAndStartIndexerService( indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) indexerService.SetLogger(logger.With("module", "txindex")) - if err := indexerService.Start(); err != nil { - return nil, nil, nil, err - } - return indexerService, txIndexer, blockIndexer, nil } @@ -222,7 +217,7 @@ func doHandshake( return nil } -func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) { +func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger) { // Log the version info. logger.Info("Version info", "tendermint_version", version.TMCoreSemVer, @@ -243,9 +238,9 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL addr := pubKey.Address() // Log whether this node is a validator or an observer if state.Validators.HasAddress(addr) { - consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey) + logger.Info("This node is a validator", "addr", addr, "pubKey", pubKey) } else { - consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey) + logger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey) } } @@ -334,12 +329,12 @@ func createBlocksyncReactor(config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore *store.BlockStore, - blockSync bool, + metrics *blocksync.Metrics, logger log.Logger, -) (bcReactor p2p.Reactor, err error) { +) (bcReactor *blocksync.Reactor, err error) { switch config.BlockSync.Version { case "v0": - bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync) + bcReactor = blocksync.NewReactor(state.Copy(), blockExec, blockStore, metrics) case "v1", "v2": return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version) default: @@ -351,31 +346,28 @@ func createBlocksyncReactor(config *cfg.Config, } func createConsensusReactor(config *cfg.Config, - state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, mempool mempl.Mempool, evidencePool *evidence.Pool, privValidator types.PrivValidator, csMetrics *cs.Metrics, - waitSync bool, eventBus *types.EventBus, consensusLogger log.Logger, ) (*cs.Reactor, *cs.State) { consensusState := cs.NewState( config.Consensus, - state.Copy(), blockExec, blockStore, mempool, evidencePool, - cs.StateMetrics(csMetrics), + cs.WithMetrics(csMetrics), ) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics)) + consensusReactor := cs.NewReactor(consensusState, cs.ReactorMetrics(csMetrics)) consensusReactor.SetLogger(consensusLogger) // services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor @@ -387,6 +379,7 @@ func createTransport( config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey *p2p.NodeKey, + netAddr *p2p.NetAddress, proxyApp proxy.AppConns, ) ( *p2p.MultiplexTransport, @@ -394,7 +387,7 @@ func createTransport( ) { var ( mConnConfig = p2p.MConnConfig(config.P2P) - transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig) + transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, *netAddr, mConnConfig) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) @@ -453,23 +446,17 @@ func createTransport( } func createSwitch(config *cfg.Config, - transport p2p.Transport, p2pMetrics *p2p.Metrics, - peerFilters []p2p.PeerFilterFunc, mempoolReactor p2p.Reactor, bcReactor p2p.Reactor, stateSyncReactor *statesync.Reactor, consensusReactor *cs.Reactor, evidenceReactor *evidence.Reactor, - nodeInfo p2p.NodeInfo, - nodeKey *p2p.NodeKey, p2pLogger log.Logger, ) *p2p.Switch { sw := p2p.NewSwitch( config.P2P, - transport, p2p.WithMetrics(p2pMetrics), - p2p.SwitchPeerFilters(peerFilters...), ) sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) @@ -477,15 +464,10 @@ func createSwitch(config *cfg.Config, sw.AddReactor("CONSENSUS", consensusReactor) sw.AddReactor("EVIDENCE", evidenceReactor) sw.AddReactor("STATESYNC", stateSyncReactor) - - sw.SetNodeInfo(nodeInfo) - sw.SetNodeKey(nodeKey) - - p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile()) return sw } -func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, +func createAddrBook(config *cfg.Config, p2pLogger log.Logger, nodeKey *p2p.NodeKey, ) (pex.AddrBook, error) { addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) @@ -506,15 +488,10 @@ func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, } addrBook.AddOurAddress(addr) } - - sw.SetAddrBook(addrBook) - return addrBook, nil } -func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, - sw *p2p.Switch, logger log.Logger, -) *pex.Reactor { +func createPEXReactor(addrBook pex.AddrBook, config *cfg.Config, logger log.Logger) *pex.Reactor { // TODO persistent peers ? so we can have their DNS addrs saved pexReactor := pex.NewReactor(addrBook, &pex.ReactorConfig{ @@ -529,7 +506,6 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, }) pexReactor.SetLogger(logger.With("module", "pex")) - sw.AddReactor("PEX", pexReactor) return pexReactor } @@ -575,16 +551,17 @@ func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.React } if blockSync { - // FIXME Very ugly to have these metrics bleed through here. - conR.Metrics.StateSyncing.Set(0) - conR.Metrics.BlockSyncing.Set(1) err = bcR.SwitchToBlockSync(state) if err != nil { ssR.Logger.Error("Failed to switch to block sync", "err", err) return } } else { - conR.SwitchToConsensus(state, true) + err := conR.SwitchToConsensus(state, true) + if err != nil { + ssR.Logger.Error("Failed to switch to consensus", "err", err) + return + } } }() return nil @@ -659,7 +636,7 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error { return nil } -func createAndStartPrivValidatorSocketClient( +func createPrivValidatorSocketClient( listenAddr, chainID string, logger log.Logger, diff --git a/p2p/switch.go b/p2p/switch.go index 884fd883e..09ad9daaf 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -1,6 +1,7 @@ package p2p import ( + "errors" "fmt" "math" "sync" @@ -105,7 +106,6 @@ type SwitchOption func(*Switch) // NewSwitch creates a new Switch with the given config. func NewSwitch( cfg *config.P2PConfig, - transport Transport, options ...SwitchOption, ) *Switch { sw := &Switch{ @@ -117,7 +117,6 @@ func NewSwitch( dialing: cmap.NewCMap(), reconnecting: cmap.NewCMap(), metrics: NopMetrics(), - transport: transport, filterTimeout: defaultFilterTimeout, persistentPeersAddrs: make([]*NetAddress, 0), unconditionalPeerIDs: make(map[ID]struct{}), @@ -217,11 +216,30 @@ func (sw *Switch) SetNodeKey(nodeKey *NodeKey) { sw.nodeKey = nodeKey } +func (sw *Switch) SetPeerFilters(filters ...PeerFilterFunc) { + sw.peerFilters = filters +} + +func (sw *Switch) SetTransport(transport Transport) { + if sw.IsRunning() { + panic("cannot set transport while switch is running") + } + sw.transport = transport +} + //--------------------------------------------------------------------- // Service start/stop // OnStart implements BaseService. It starts all the reactors and peers. func (sw *Switch) OnStart() error { + if sw.transport == nil { + return errors.New("transport not set") + } + + if err := sw.transport.Start(); err != nil { + return err + } + // Start reactors for _, reactor := range sw.reactors { err := reactor.Start() @@ -250,6 +268,10 @@ func (sw *Switch) OnStop() { sw.Logger.Error("error while stopped reactor", "reactor", reactor, "error", err) } } + + if err := sw.transport.Stop(); err != nil { + sw.Logger.Error("closing transport", "err", err) + } } //--------------------------------------------------------------------- diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 9d5466df7..74bb8960b 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -21,6 +21,7 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/p2p/conn" ) @@ -671,9 +672,16 @@ func TestSwitchAcceptRoutine(t *testing.T) { } type errorTransport struct { + service.BaseService acceptErr error } +func newErrTransport(acceptErr error) *errorTransport { + t := &errorTransport{acceptErr: acceptErr} + t.BaseService = *service.NewBaseService(nil, "Error Transport", t) + return t +} + func (et errorTransport) NetAddress() NetAddress { panic("not implemented") } @@ -689,7 +697,9 @@ func (errorTransport) Cleanup(Peer) { } func TestSwitchAcceptRoutineErrorCases(t *testing.T) { - sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}}) + sw := NewSwitch(cfg) + sw.SetTransport(newErrTransport(ErrFilterTimeout{})) + assert.NotPanics(t, func() { err := sw.Start() require.NoError(t, err) @@ -697,7 +707,8 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) { require.NoError(t, err) }) - sw = NewSwitch(cfg, errorTransport{ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}}) + sw = NewSwitch(cfg) + sw.SetTransport(newErrTransport(ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true})) assert.NotPanics(t, func() { err := sw.Start() require.NoError(t, err) @@ -706,7 +717,8 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) { }) // TODO(melekes) check we remove our address from addrBook - sw = NewSwitch(cfg, errorTransport{ErrTransportClosed{}}) + sw = NewSwitch(cfg) + sw.SetTransport(newErrTransport(ErrTransportClosed{})) assert.NotPanics(t, func() { err := sw.Start() require.NoError(t, err) diff --git a/p2p/test_util.go b/p2p/test_util.go index 4e56f0193..06d5156c0 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -192,14 +192,11 @@ func MakeSwitch( panic(err) } - t := NewMultiplexTransport(nodeInfo, nodeKey, MConnConfig(cfg)) - - if err := t.Listen(*addr); err != nil { - panic(err) - } + t := NewMultiplexTransport(nodeInfo, nodeKey, *addr, MConnConfig(cfg)) // TODO: let the config be passed in? - sw := initSwitch(i, NewSwitch(cfg, t, opts...)) + sw := initSwitch(i, NewSwitch(cfg, opts...)) + sw.SetTransport(t) sw.SetLogger(log.TestingLogger().With("switch", i)) sw.SetNodeKey(&nodeKey) diff --git a/p2p/transport.go b/p2p/transport.go index e6e19a901..40893144d 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -10,6 +10,7 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/libs/protoio" + "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/p2p/conn" tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" ) @@ -56,6 +57,8 @@ type peerConfig struct { // the transport. Each transport is also responsible to filter establishing // peers specific to its domain. type Transport interface { + service.Service + // Listening address. NetAddress() NetAddress @@ -69,13 +72,6 @@ type Transport interface { Cleanup(Peer) } -// transportLifecycle bundles the methods for callers to control start and stop -// behavior. -type transportLifecycle interface { - Close() error - Listen(NetAddress) error -} - // ConnFilterFunc to be implemented by filter hooks after a new connection has // been established. The set of exisiting connections is passed along together // with all resolved IPs for the new connection. @@ -133,6 +129,8 @@ func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption { // MultiplexTransport accepts and dials tcp connections and upgrades them to // multiplexed peers. type MultiplexTransport struct { + service.BaseService + netAddr NetAddress listener net.Listener maxIncomingConnections int // see MaxIncomingConnections @@ -159,15 +157,15 @@ type MultiplexTransport struct { // Test multiplexTransport for interface completeness. var _ Transport = (*MultiplexTransport)(nil) -var _ transportLifecycle = (*MultiplexTransport)(nil) // NewMultiplexTransport returns a tcp connected multiplexed peer. func NewMultiplexTransport( nodeInfo NodeInfo, nodeKey NodeKey, + netAddr NetAddress, mConfig conn.MConnConfig, ) *MultiplexTransport { - return &MultiplexTransport{ + t := &MultiplexTransport{ acceptc: make(chan accept), closec: make(chan struct{}), dialTimeout: defaultDialTimeout, @@ -176,9 +174,12 @@ func NewMultiplexTransport( mConfig: mConfig, nodeInfo: nodeInfo, nodeKey: nodeKey, + netAddr: netAddr, conns: NewConnSet(), resolver: net.DefaultResolver, } + t.BaseService = *service.NewBaseService(nil, "P2P Transport", t) + return t } // NetAddress implements Transport. @@ -231,20 +232,20 @@ func (mt *MultiplexTransport) Dial( return p, nil } -// Close implements transportLifecycle. -func (mt *MultiplexTransport) Close() error { +// OnStop implements Service. +func (mt *MultiplexTransport) OnStop() { close(mt.closec) if mt.listener != nil { - return mt.listener.Close() + if err := mt.listener.Close(); err != nil { + mt.Logger.Error("closing listener", "err", err) + } } - - return nil } -// Listen implements transportLifecycle. -func (mt *MultiplexTransport) Listen(addr NetAddress) error { - ln, err := net.Listen("tcp", addr.DialString()) +// OnStart implements Service. +func (mt *MultiplexTransport) OnStart() error { + ln, err := net.Listen("tcp", mt.netAddr.DialString()) if err != nil { return err } @@ -253,7 +254,6 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error { ln = netutil.LimitListener(ln, mt.maxIncomingConnections) } - mt.netAddr = addr mt.listener = ln go mt.acceptPeers() diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 90b074256..0271ffe01 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/libs/protoio" "github.com/tendermint/tendermint/p2p/conn" @@ -28,20 +29,26 @@ func emptyNodeInfo() NodeInfo { func newMultiplexTransport( nodeInfo NodeInfo, nodeKey NodeKey, + netAddr NetAddress, ) *MultiplexTransport { return NewMultiplexTransport( - nodeInfo, nodeKey, conn.DefaultMConnConfig(), + nodeInfo, nodeKey, netAddr, conn.DefaultMConnConfig(), ) } func TestTransportMultiplexConnFilter(t *testing.T) { + nodeKey := NodeKey{ + PrivKey: ed25519.GenPrivKey(), + } + id := nodeKey.ID() + addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) + require.NoError(t, err) + mt := newMultiplexTransport( emptyNodeInfo(), - NodeKey{ - PrivKey: ed25519.GenPrivKey(), - }, + nodeKey, + *addr, ) - id := mt.nodeKey.ID() MultiplexTransportConnFilters( func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil }, @@ -51,14 +58,8 @@ func TestTransportMultiplexConnFilter(t *testing.T) { }, )(mt) - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } + err = mt.Start() + require.NoError(t, err) errc := make(chan error) @@ -89,13 +90,18 @@ func TestTransportMultiplexConnFilter(t *testing.T) { } func TestTransportMultiplexConnFilterTimeout(t *testing.T) { + nodeKey := NodeKey{ + PrivKey: ed25519.GenPrivKey(), + } + id := nodeKey.ID() + addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) + require.NoError(t, err) + mt := newMultiplexTransport( emptyNodeInfo(), - NodeKey{ - PrivKey: ed25519.GenPrivKey(), - }, + nodeKey, + *addr, ) - id := mt.nodeKey.ID() MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt) MultiplexTransportConnFilters( @@ -105,14 +111,9 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) { }, )(mt) - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } + err = mt.Start() + require.NoError(t, err) + defer mt.Stop() errc := make(chan error) go func() { @@ -140,6 +141,10 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) { func TestTransportMultiplexMaxIncomingConnections(t *testing.T) { pv := ed25519.GenPrivKey() id := PubKeyToID(pv.PubKey()) + addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) + if err != nil { + t.Fatal(err) + } mt := newMultiplexTransport( testNodeInfo( id, "transport", @@ -147,26 +152,22 @@ func TestTransportMultiplexMaxIncomingConnections(t *testing.T) { NodeKey{ PrivKey: pv, }, + *addr, ) MultiplexTransportMaxIncomingConnections(0)(mt) - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } const maxIncomingConns = 2 MultiplexTransportMaxIncomingConnections(maxIncomingConns)(mt) - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } + err = mt.Start() + require.NoError(t, err) laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr()) // Connect more peers than max for i := 0; i <= maxIncomingConns; i++ { errc := make(chan error) - go testDialer(*laddr, errc) + go testDialer(t, *laddr, errc) err = <-errc if i < maxIncomingConns { @@ -198,7 +199,7 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) { // Setup dialers. for i := 0; i < nDialers; i++ { - go testDialer(*laddr, errc) + go testDialer(t, *laddr, errc) } // Catch connection errors. @@ -235,23 +236,26 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) { } } - if err := mt.Close(); err != nil { + if err := mt.Stop(); err != nil { t.Errorf("close errored: %v", err) } } -func testDialer(dialAddr NetAddress, errc chan error) { - var ( - pv = ed25519.GenPrivKey() - dialer = newMultiplexTransport( - testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName), - NodeKey{ - PrivKey: pv, - }, - ) +func testDialer(t *testing.T, dialAddr NetAddress, errc chan error) { + pv := ed25519.GenPrivKey() + id := PubKeyToID(pv.PubKey()) + addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) + require.NoError(t, err) + + dialer := newMultiplexTransport( + testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName), + NodeKey{ + PrivKey: pv, + }, + *addr, ) - _, err := dialer.Dial(dialAddr, peerConfig{}) + _, err = dialer.Dial(dialAddr, peerConfig{}) if err != nil { errc <- err return @@ -319,15 +323,14 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) { go func() { <-slowc - var ( - dialer = newMultiplexTransport( - fastNodeInfo, - NodeKey{ - PrivKey: fastNodePV, - }, - ) - ) addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr()) + dialer := newMultiplexTransport( + fastNodeInfo, + NodeKey{ + PrivKey: fastNodePV, + }, + *addr, + ) _, err := dialer.Dial(*addr, peerConfig{}) if err != nil { @@ -360,17 +363,16 @@ func TestTransportMultiplexValidateNodeInfo(t *testing.T) { errc := make(chan error) go func() { - var ( - pv = ed25519.GenPrivKey() - dialer = newMultiplexTransport( - testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty - NodeKey{ - PrivKey: pv, - }, - ) - ) + pv := ed25519.GenPrivKey() addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr()) + dialer := newMultiplexTransport( + testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty + NodeKey{ + PrivKey: pv, + }, + *addr, + ) _, err := dialer.Dial(*addr, peerConfig{}) if err != nil { @@ -401,6 +403,7 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) { errc := make(chan error) go func() { + addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr()) dialer := newMultiplexTransport( testNodeInfo( PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer", @@ -408,8 +411,8 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) { NodeKey{ PrivKey: ed25519.GenPrivKey(), }, + *addr, ) - addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr()) _, err := dialer.Dial(*addr, peerConfig{}) if err != nil { @@ -437,18 +440,16 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) { func TestTransportMultiplexDialRejectWrongID(t *testing.T) { mt := testSetupMultiplexTransport(t) - var ( - pv = ed25519.GenPrivKey() - dialer = newMultiplexTransport( - testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty - NodeKey{ - PrivKey: pv, - }, - ) - ) - + pv := ed25519.GenPrivKey() wrongID := PubKeyToID(ed25519.GenPrivKey().PubKey()) addr := NewNetAddress(wrongID, mt.listener.Addr()) + dialer := newMultiplexTransport( + testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty + NodeKey{ + PrivKey: pv, + }, + *addr, + ) _, err := dialer.Dial(*addr, peerConfig{}) if err != nil { @@ -471,14 +472,15 @@ func TestTransportMultiplexRejectIncompatible(t *testing.T) { go func() { var ( pv = ed25519.GenPrivKey() + addr = NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr()) dialer = newMultiplexTransport( testNodeInfoWithNetwork(PubKeyToID(pv.PubKey()), "dialer", "incompatible-network"), NodeKey{ PrivKey: pv, }, + *addr, ) ) - addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr()) _, err := dialer.Dial(*addr, peerConfig{}) if err != nil { @@ -622,11 +624,16 @@ func TestTransportHandshake(t *testing.T) { } func TestTransportAddChannel(t *testing.T) { + pv := ed25519.GenPrivKey() + id := PubKeyToID(pv.PubKey()) + addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) + require.NoError(t, err) mt := newMultiplexTransport( emptyNodeInfo(), NodeKey{ - PrivKey: ed25519.GenPrivKey(), + PrivKey: pv, }, + *addr, ) testChannel := byte(0x01) @@ -638,27 +645,27 @@ func TestTransportAddChannel(t *testing.T) { // create listener func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport { - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - mt = newMultiplexTransport( - testNodeInfo( - id, "transport", - ), - NodeKey{ - PrivKey: pv, - }, - ) - ) - + pv := ed25519.GenPrivKey() + id := PubKeyToID(pv.PubKey()) addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) if err != nil { t.Fatal(err) } + mt := newMultiplexTransport( + testNodeInfo( + id, "transport", + ), + NodeKey{ + PrivKey: pv, + }, + *addr, + ) - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } + err = mt.Start() + require.NoError(t, err) + t.Cleanup(func() { + mt.Stop() + }) // give the listener some time to get ready time.Sleep(20 * time.Millisecond) diff --git a/rpc/core/env.go b/rpc/core/env.go index e92319937..c11010160 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/tendermint/tendermint/blocksync" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" @@ -16,6 +17,7 @@ import ( sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/statesync" "github.com/tendermint/tendermint/types" ) @@ -91,6 +93,8 @@ type Environment struct { TxIndexer txindex.TxIndexer BlockIndexer indexer.BlockIndexer ConsensusReactor *consensus.Reactor + BlocksyncReactor *blocksync.Reactor + StatesyncReactor *statesync.Reactor EventBus *types.EventBus // thread safe Mempool mempl.Mempool @@ -199,9 +203,5 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) { } func latestUncommittedHeight() int64 { - nodeIsSyncing := env.ConsensusReactor.WaitSync() - if nodeIsSyncing { - return env.BlockStore.Height() - } return env.BlockStore.Height() + 1 } diff --git a/rpc/core/status.go b/rpc/core/status.go index a2a70d95d..77af4bb2b 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -51,6 +51,16 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { votingPower = val.VotingPower } + phase := "initializing" + switch { + case env.StatesyncReactor.IsSyncing(): + phase = "statesync" + case env.BlocksyncReactor.IsSyncing(): + phase = "blocksync" + case env.ConsensusReactor.IsConsensusRunning(): + phase = "consensus" + } + result := &ctypes.ResultStatus{ NodeInfo: env.P2PTransport.NodeInfo().(p2p.DefaultNodeInfo), SyncInfo: ctypes.SyncInfo{ @@ -62,7 +72,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { EarliestAppHash: earliestAppHash, EarliestBlockHeight: earliestBlockHeight, EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), - CatchingUp: env.ConsensusReactor.WaitSync(), + Phase: phase, }, ValidatorInfo: ctypes.ValidatorInfo{ Address: env.PubKey.Address(), diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 6da818890..232f2aa47 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -86,7 +86,9 @@ type SyncInfo struct { EarliestBlockHeight int64 `json:"earliest_block_height"` EarliestBlockTime time.Time `json:"earliest_block_time"` - CatchingUp bool `json:"catching_up"` + // Phase inidicates which processes are advancing state: + // Either statesync, blocksync or consensus + Phase string `json:"phase"` } // Info about the node's validator diff --git a/state/state.go b/state/state.go index 51ce5a3f8..c8d40dab1 100644 --- a/state/state.go +++ b/state/state.go @@ -81,6 +81,9 @@ type State struct { // Copy makes a copy of the State for mutating. func (state State) Copy() State { + if state.IsEmpty() { + return State{} + } return State{ Version: state.Version, diff --git a/statesync/metrics.gen.go b/statesync/metrics.gen.go new file mode 100644 index 000000000..1941c9270 --- /dev/null +++ b/statesync/metrics.gen.go @@ -0,0 +1,30 @@ +// Code generated by metricsgen. DO NOT EDIT. + +package statesync + +import ( + "github.com/go-kit/kit/metrics/discard" + prometheus "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "syncing", + Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.", + }, labels).With(labelsAndValues...), + } +} + +func NopMetrics() *Metrics { + return &Metrics{ + Syncing: discard.NewGauge(), + } +} diff --git a/statesync/metrics.go b/statesync/metrics.go new file mode 100644 index 000000000..9a4d7fcef --- /dev/null +++ b/statesync/metrics.go @@ -0,0 +1,19 @@ +package statesync + +import ( + "github.com/go-kit/kit/metrics" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "statesync" +) + +//go:generate go run ../scripts/metricsgen -struct=Metrics + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // Whether or not a node is state syncing. 1 if yes, 0 if no. + Syncing metrics.Gauge +} diff --git a/statesync/reactor.go b/statesync/reactor.go index 8434b6adf..13a2d2612 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -33,6 +33,7 @@ type Reactor struct { conn proxy.AppConnSnapshot connQuery proxy.AppConnQuery tempDir string + metrics *Metrics // This will only be set when a state sync is in progress. It is used to feed received // snapshots and chunks into the sync. @@ -46,12 +47,14 @@ func NewReactor( conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDir string, + metrics *Metrics, ) *Reactor { r := &Reactor{ cfg: cfg, conn: conn, connQuery: connQuery, + metrics: metrics, } r.BaseReactor = *p2p.NewBaseReactor("StateSync", r) @@ -90,6 +93,13 @@ func (r *Reactor) AddPeer(peer p2p.Peer) { } } +// IsSyncing returns true if state sync is actively being used to restore a state +func (r *Reactor) IsSyncing() bool { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.syncer != nil +} + // RemovePeer implements p2p.Reactor. func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { r.mtx.RLock() @@ -263,6 +273,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) r.mtx.Unlock() return sm.State{}, nil, errors.New("a state sync is already in progress") } + r.metrics.Syncing.Add(1) r.syncer = newSyncer(r.cfg, r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir) r.mtx.Unlock() @@ -278,6 +289,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) r.mtx.Lock() r.syncer = nil + r.metrics.Syncing.Add(0) r.mtx.Unlock() return state, commit, err } diff --git a/statesync/reactor_test.go b/statesync/reactor_test.go index 053b47ef5..a2add7e0f 100644 --- a/statesync/reactor_test.go +++ b/statesync/reactor_test.go @@ -62,7 +62,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) { // Start a reactor and send a ssproto.ChunkRequest, then wait for and check response cfg := config.DefaultStateSyncConfig() - r := NewReactor(*cfg, conn, nil, "") + r := NewReactor(*cfg, conn, nil, "", NopMetrics()) err := r.Start() require.NoError(t, err) t.Cleanup(func() { @@ -140,7 +140,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) { // Start a reactor and send a SnapshotsRequestMessage, then wait for and check responses cfg := config.DefaultStateSyncConfig() - r := NewReactor(*cfg, conn, nil, "") + r := NewReactor(*cfg, conn, nil, "", NopMetrics()) err := r.Start() require.NoError(t, err) t.Cleanup(func() { diff --git a/types/vote_set.go b/types/vote_set.go index 2fec82348..e4af953fd 100644 --- a/types/vote_set.go +++ b/types/vote_set.go @@ -101,7 +101,7 @@ func (voteSet *VoteSet) ChainID() string { // Implements VoteSetReader. func (voteSet *VoteSet) GetHeight() int64 { - if voteSet == nil { + if voteSet.IsEmpty() { return 0 } return voteSet.height @@ -109,7 +109,7 @@ func (voteSet *VoteSet) GetHeight() int64 { // Implements VoteSetReader. func (voteSet *VoteSet) GetRound() int32 { - if voteSet == nil { + if voteSet.IsEmpty() { return -1 } return voteSet.round @@ -117,7 +117,7 @@ func (voteSet *VoteSet) GetRound() int32 { // Implements VoteSetReader. func (voteSet *VoteSet) Type() byte { - if voteSet == nil { + if voteSet.IsEmpty() { return 0x00 } return byte(voteSet.signedMsgType) @@ -125,12 +125,16 @@ func (voteSet *VoteSet) Type() byte { // Implements VoteSetReader. func (voteSet *VoteSet) Size() int { - if voteSet == nil { + if voteSet.IsEmpty() { return 0 } return voteSet.valSet.Size() } +func (voteSet *VoteSet) IsEmpty() bool { + return voteSet == nil || voteSet.height == 0 +} + // Returns added=true if vote is valid and new. // Otherwise returns err=ErrVote[ //