From 5df277cacaf27088863f69ab656ae968f1a53089 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 8 Apr 2022 13:19:28 +0000 Subject: [PATCH 1/3] build(deps): Bump github.com/lib/pq from 1.10.4 to 1.10.5 (#8283) Bumps [github.com/lib/pq](https://github.com/lib/pq) from 1.10.4 to 1.10.5.
Commits

[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=github.com/lib/pq&package-manager=go_modules&previous-version=1.10.4&new-version=1.10.5)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) ---
Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
--- go.mod | 4 ++-- go.sum | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 49db1ba5e..b25b8ef39 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 - github.com/lib/pq v1.10.4 + github.com/lib/pq v1.10.5 github.com/libp2p/go-buffer-pool v0.0.2 github.com/mroth/weightedrand v0.4.1 github.com/oasisprotocol/curve25519-voi v0.0.0-20210609091139-0a56a4bca00b @@ -39,6 +39,7 @@ require ( require ( github.com/creachadair/atomicfile v0.2.4 + github.com/creachadair/taskgroup v0.3.2 github.com/golangci/golangci-lint v1.45.2 github.com/google/go-cmp v0.5.7 github.com/vektra/mockery/v2 v2.10.4 @@ -72,7 +73,6 @@ require ( github.com/charithe/durationcheck v0.0.9 // indirect github.com/chavacava/garif v0.0.0-20210405164556-e8a0a408d6af // indirect github.com/containerd/continuity v0.2.1 // indirect - github.com/creachadair/taskgroup v0.3.2 // indirect github.com/daixiang0/gci v0.3.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/denis-tingaikin/go-header v0.4.3 // indirect diff --git a/go.sum b/go.sum index c0388b7af..4fc7fab1e 100644 --- a/go.sum +++ b/go.sum @@ -658,8 +658,9 @@ github.com/letsencrypt/pkcs11key/v4 v4.0.0/go.mod h1:EFUvBDay26dErnNb70Nd0/VW3tJ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk= github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.5 h1:J+gdV2cUmX7ZqL2B0lFcW0m+egaHC2V3lpO8nWxyYiQ= +github.com/lib/pq v1.10.5/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= From 90b951af7210a0275f79b3075e420c9a3f465a75 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 8 Apr 2022 11:00:58 -0400 Subject: [PATCH 2/3] node+statesync: normalize initialization (#8275) --- internal/statesync/reactor.go | 54 ++++++++++++------ internal/statesync/reactor_test.go | 5 +- node/node.go | 88 +++++++++++------------------- 3 files changed, 74 insertions(+), 73 deletions(-) diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 5ca1d0798..ea2cac4f4 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -143,6 +143,12 @@ type Reactor struct { peerEvents p2p.PeerEventSubscriber chCreator p2p.ChannelCreator sendBlockError func(context.Context, p2p.PeerError) error + postSyncHook func(context.Context, sm.State) error + + // when true, the reactor will, during startup perform a + // statesync for this node, and otherwise just provide + // snapshots to other nodes. + needsStateSync bool // Dispatcher is used to multiplex light block requests and responses over multiple // peers used by the p2p state provider and in reverse sync. @@ -171,7 +177,6 @@ type Reactor struct { // and querying, references to p2p Channels and a channel to listen for peer // updates on. Note, the reactor will close all p2p Channels when stopping. func NewReactor( - ctx context.Context, chainID string, initialHeight int64, cfg config.StateSyncConfig, @@ -184,23 +189,26 @@ func NewReactor( tempDir string, ssMetrics *Metrics, eventBus *eventbus.EventBus, + postSyncHook func(context.Context, sm.State) error, + needsStateSync bool, ) *Reactor { - r := &Reactor{ - logger: logger, - chainID: chainID, - initialHeight: initialHeight, - cfg: cfg, - conn: conn, - chCreator: channelCreator, - peerEvents: peerEvents, - tempDir: tempDir, - stateStore: stateStore, - blockStore: blockStore, - peers: newPeerList(), - providers: make(map[types.NodeID]*BlockProvider), - metrics: ssMetrics, - eventBus: eventBus, + logger: logger, + chainID: chainID, + initialHeight: initialHeight, + cfg: cfg, + conn: conn, + chCreator: channelCreator, + peerEvents: peerEvents, + tempDir: tempDir, + stateStore: stateStore, + blockStore: blockStore, + peers: newPeerList(), + providers: make(map[types.NodeID]*BlockProvider), + metrics: ssMetrics, + eventBus: eventBus, + postSyncHook: postSyncHook, + needsStateSync: needsStateSync, } r.BaseService = *service.NewBaseService(logger, "StateSync", r) @@ -300,6 +308,14 @@ func (r *Reactor) OnStart(ctx context.Context) error { go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh) go r.processPeerUpdates(ctx, r.peerEvents(ctx)) + if r.needsStateSync { + r.logger.Info("starting state sync") + if _, err := r.Sync(ctx); err != nil { + r.logger.Error("state sync failed; shutting down this node", "err", err) + return err + } + } + return nil } @@ -379,6 +395,12 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { } } + if r.postSyncHook != nil { + if err := r.postSyncHook(ctx, state); err != nil { + return sm.State{}, err + } + } + return state, nil } diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index cef0735f2..86c84a9ed 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -155,7 +155,6 @@ func setup( logger := log.NewNopLogger() rts.reactor = NewReactor( - ctx, factory.DefaultTestChainID, 1, *cfg, @@ -167,7 +166,9 @@ func setup( rts.blockStore, "", m, - nil, // eventbus can be nil + nil, // eventbus can be nil + nil, // post-sync-hook + false, // run Sync during Start() ) rts.syncer = &syncer{ diff --git a/node/node.go b/node/node.go index 37b66c697..8a4da29f2 100644 --- a/node/node.go +++ b/node/node.go @@ -60,19 +60,17 @@ type nodeImpl struct { nodeKey types.NodeKey // our node privkey // services - eventSinks []indexer.EventSink - initialState sm.State - stateStore sm.Store - blockStore *store.BlockStore // store the blockchain to disk - evPool *evidence.Pool - stateSync bool // whether the node should state sync on startup - stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots - indexerService *indexer.Service - services []service.Service - rpcListeners []net.Listener // rpc servers - shutdownOps closer - rpcEnv *rpccore.Environment - prometheusSrv *http.Server + eventSinks []indexer.EventSink + initialState sm.State + stateStore sm.Store + blockStore *store.BlockStore // store the blockchain to disk + evPool *evidence.Pool + indexerService *indexer.Service + services []service.Service + rpcListeners []net.Listener // rpc servers + shutdownOps closer + rpcEnv *rpccore.Environment + prometheusSrv *http.Server } // newDefaultNode returns a Tendermint node with default settings for the @@ -331,13 +329,15 @@ func makeNode( nodeMetrics.consensus.BlockSyncing.Set(1) } + if cfg.P2P.PexReactor { + node.services = append(node.services, pex.NewReactor(logger, peerManager, node.router.OpenChannel, peerManager.Subscribe)) + } + // Set up state sync reactor, and schedule a sync if requested. // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - node.stateSync = stateSync - node.stateSyncReactor = statesync.NewReactor( - ctx, + node.services = append(node.services, statesync.NewReactor( genDoc.ChainID, genDoc.InitialHeight, *cfg.StateSync, @@ -350,11 +350,24 @@ func makeNode( cfg.StateSync.TempDir, nodeMetrics.statesync, eventBus, - ) + // the post-sync operation + func(ctx context.Context, state sm.State) error { + csReactor.SetStateSyncingMetrics(0) - if cfg.P2P.PexReactor { - node.services = append(node.services, pex.NewReactor(logger, peerManager, node.router.OpenChannel, peerManager.Subscribe)) - } + // TODO: Some form of orchestrator is needed here between the state + // advancing reactors to be able to control which one of the three + // is running + // FIXME Very ugly to have these metrics bleed through here. + csReactor.SetBlockSyncingMetrics(1) + if err := bcReactor.SwitchToBlockSync(ctx, state); err != nil { + logger.Error("failed to switch to block sync", "err", err) + return err + } + + return nil + }, + stateSync, + )) if cfg.Mode == config.ModeValidator { node.rpcEnv.PubKey = pubKey @@ -481,40 +494,6 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { } } - if err := n.stateSyncReactor.Start(ctx); err != nil { - return err - } - - // Run state sync - // TODO: We shouldn't run state sync if we already have state that has a - // LastBlockHeight that is not InitialHeight - if n.stateSync { - // RUN STATE SYNC NOW: - // - // TODO: Eventually this should run as part of some - // separate orchestrator - n.logger.Info("starting state sync") - ssState, err := n.stateSyncReactor.Sync(ctx) - if err != nil { - n.logger.Error("state sync failed; shutting down this node", "err", err) - // stop the node - n.Stop() - return err - } - - n.rpcEnv.ConsensusReactor.SetStateSyncingMetrics(0) - - // TODO: Some form of orchestrator is needed here between the state - // advancing reactors to be able to control which one of the three - // is running - // FIXME Very ugly to have these metrics bleed through here. - n.rpcEnv.ConsensusReactor.SetBlockSyncingMetrics(1) - if err := n.rpcEnv.BlockSyncReactor.SwitchToBlockSync(ctx, ssState); err != nil { - n.logger.Error("failed to switch to block sync", "err", err) - return err - } - } - return nil } @@ -531,7 +510,6 @@ func (n *nodeImpl) OnStop() { reactor.Wait() } - n.stateSyncReactor.Wait() n.router.Wait() n.rpcEnv.IsListening = false From 3e3a9348180b9e3a8a48d887e3e5864624ed347b Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 8 Apr 2022 14:04:43 -0400 Subject: [PATCH 3/3] rpc: add more nil checks in the status end point (#8287) --- internal/rpc/core/status.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/internal/rpc/core/status.go b/internal/rpc/core/status.go index 46b8a6fcd..870c13424 100644 --- a/internal/rpc/core/status.go +++ b/internal/rpc/core/status.go @@ -77,14 +77,25 @@ func (env *Environment) Status(ctx context.Context) (*coretypes.ResultStatus, er EarliestAppHash: earliestAppHash, EarliestBlockHeight: earliestBlockHeight, EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), - MaxPeerBlockHeight: env.BlockSyncReactor.GetMaxPeerBlockHeight(), - CatchingUp: env.ConsensusReactor.WaitSync(), - TotalSyncedTime: env.BlockSyncReactor.GetTotalSyncedTime(), - RemainingTime: env.BlockSyncReactor.GetRemainingSyncTime(), + // this should start as true, if consensus + // hasn't started yet, and then flip to false + // (or true,) depending on what's actually + // happening. + CatchingUp: true, }, ValidatorInfo: validatorInfo, } + if env.ConsensusReactor != nil { + result.SyncInfo.CatchingUp = env.ConsensusReactor.WaitSync() + } + + if env.BlockSyncReactor != nil { + result.SyncInfo.MaxPeerBlockHeight = env.BlockSyncReactor.GetMaxPeerBlockHeight() + result.SyncInfo.TotalSyncedTime = env.BlockSyncReactor.GetTotalSyncedTime() + result.SyncInfo.RemainingTime = env.BlockSyncReactor.GetRemainingSyncTime() + } + if env.StateSyncMetricer != nil { result.SyncInfo.TotalSnapshots = env.StateSyncMetricer.TotalSnapshots() result.SyncInfo.ChunkProcessAvgTime = env.StateSyncMetricer.ChunkProcessAvgTime() @@ -103,6 +114,9 @@ func (env *Environment) validatorAtHeight(h int64) *types.Validator { if err != nil { return nil } + if env.ConsensusState == nil { + return nil + } if env.PubKey == nil { return nil }