Merge branch 'master' into thane/7655-vote-extensions

This commit is contained in:
Thane Thomson
2022-04-05 14:13:20 -04:00
committed by GitHub
36 changed files with 574 additions and 622 deletions

View File

@@ -88,12 +88,11 @@ func newDefaultNode(
}
if cfg.Mode == config.ModeSeed {
return makeSeedNode(
ctx,
logger,
cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(cfg),
logger,
)
}
pval, err := makeDefaultPrivval(cfg)
@@ -244,7 +243,7 @@ func makeNode(
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state.Version.Consensus)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
@@ -257,24 +256,21 @@ func makeNode(
makeCloser(closers))
}
router, err := createRouter(ctx, logger, nodeMetrics.p2p, nodeInfo, nodeKey,
peerManager, cfg, proxyApp)
router, err := createRouter(logger, nodeMetrics.p2p, nodeInfo, nodeKey, peerManager, cfg, proxyApp)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
makeCloser(closers))
}
mpReactor, mp, err := createMempoolReactor(ctx,
cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger,
)
mpReactor, mp, err := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool,
peerManager.Subscribe, router.OpenChannel, peerManager.GetHeight)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
evReactor, evPool, edbCloser, err := createEvidenceReactor(ctx,
cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
)
evReactor, evPool, edbCloser, err := createEvidenceReactor(logger, cfg, dbProvider,
stateStore, blockStore, peerManager.Subscribe, router.OpenChannel, nodeMetrics.evidence, eventBus)
closers = append(closers, edbCloser)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@@ -295,11 +291,12 @@ func makeNode(
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
waitSync := stateSync || blockSync
csReactor, csState, err := createConsensusReactor(ctx,
cfg, stateStore, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
peerManager, router, logger,
privValidator, nodeMetrics.consensus, waitSync, eventBus,
peerManager, router.OpenChannel, logger,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@@ -307,23 +304,18 @@ func makeNode(
// Create the blockchain reactor. Note, we do not start block sync if we're
// doing a state sync first.
bcReactor, err := blocksync.NewReactor(ctx,
bcReactor := blocksync.NewReactor(
logger.With("module", "blockchain"),
stateStore,
blockExec,
blockStore,
csReactor,
router.OpenChannel,
peerManager.Subscribe(ctx),
peerManager.Subscribe,
blockSync && !stateSync,
nodeMetrics.consensus,
eventBus,
)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("could not create blocksync reactor: %w", err),
makeCloser(closers))
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
@@ -337,7 +329,7 @@ func makeNode(
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
stateSyncReactor, err := statesync.NewReactor(
stateSyncReactor := statesync.NewReactor(
ctx,
genDoc.ChainID,
genDoc.InitialHeight,
@@ -345,23 +337,17 @@ func makeNode(
logger.With("module", "statesync"),
proxyApp,
router.OpenChannel,
peerManager.Subscribe(ctx),
peerManager.Subscribe,
stateStore,
blockStore,
cfg.StateSync.TempDir,
nodeMetrics.statesync,
eventBus,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
var pexReactor service.Service = service.NopService{}
if cfg.P2P.PexReactor {
pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
pexReactor = pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe)
}
node := &nodeImpl{
config: cfg,

View File

@@ -587,12 +587,12 @@ func TestNodeNewSeedNode(t *testing.T) {
logger := log.NewNopLogger()
ns, err := makeSeedNode(ctx,
ns, err := makeSeedNode(
logger,
cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(cfg),
logger,
)
t.Cleanup(ns.Wait)
t.Cleanup(leaktest.CheckTimeout(t, time.Second))

View File

@@ -68,7 +68,7 @@ func New(
config.DefaultDBProvider,
logger)
case config.ModeSeed:
return makeSeedNode(ctx, conf, config.DefaultDBProvider, nodeKey, genProvider, logger)
return makeSeedNode(logger, conf, config.DefaultDBProvider, nodeKey, genProvider)
default:
return nil, fmt.Errorf("%q is not a valid mode", conf.Mode)
}

View File

@@ -40,12 +40,11 @@ type seedNodeImpl struct {
// makeSeedNode returns a new seed node, containing only p2p, pex reactor
func makeSeedNode(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
nodeKey types.NodeKey,
genesisDocProvider genesisDocProvider,
logger log.Logger,
) (service.Service, error) {
if !cfg.P2P.PexReactor {
return nil, errors.New("cannot run seed nodes with PEX disabled")
@@ -76,19 +75,13 @@ func makeSeedNode(
closer)
}
router, err := createRouter(ctx, logger, p2pMetrics, nodeInfo, nodeKey,
peerManager, cfg, nil)
router, err := createRouter(logger, p2pMetrics, nodeInfo, nodeKey, peerManager, cfg, nil)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
closer)
}
pexReactor, err := pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil {
return nil, combineCloseError(err, closer)
}
node := &seedNodeImpl{
config: cfg,
logger: logger,
@@ -101,7 +94,7 @@ func makeSeedNode(
shutdownOps: closer,
pexReactor: pexReactor,
pexReactor: pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe),
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)

View File

@@ -169,14 +169,14 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
}
func createMempoolReactor(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
appClient abciclient.Client,
store sm.Store,
memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
peerEvents p2p.PeerEventSubscriber,
chCreator p2p.ChannelCreator,
peerHeight func(types.NodeID) int64,
) (service.Service, mempool.Mempool, error) {
logger = logger.With("module", "mempool")
@@ -189,18 +189,14 @@ func createMempoolReactor(
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
)
reactor, err := mempool.NewReactor(
ctx,
reactor := mempool.NewReactor(
logger,
cfg.Mempool,
peerManager,
mp,
router.OpenChannel,
peerManager.Subscribe(ctx),
chCreator,
peerEvents,
peerHeight,
)
if err != nil {
return nil, nil, err
}
if cfg.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
@@ -210,14 +206,13 @@ func createMempoolReactor(
}
func createEvidenceReactor(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
store sm.Store,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
peerEvents p2p.PeerEventSubscriber,
chCreator p2p.ChannelCreator,
metrics *evidence.Metrics,
eventBus *eventbus.EventBus,
) (*evidence.Reactor, *evidence.Pool, closer, error) {
@@ -231,16 +226,12 @@ func createEvidenceReactor(
evidencePool := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics, eventBus)
evidenceReactor, err := evidence.NewReactor(
ctx,
evidenceReactor := evidence.NewReactor(
logger,
router.OpenChannel,
peerManager.Subscribe(ctx),
chCreator,
peerEvents,
evidencePool,
)
if err != nil {
return nil, nil, dbCloser, fmt.Errorf("creating evidence reactor: %w", err)
}
return evidenceReactor, evidencePool, dbCloser, nil
}
@@ -258,7 +249,7 @@ func createConsensusReactor(
waitSync bool,
eventBus *eventbus.EventBus,
peerManager *p2p.PeerManager,
router *p2p.Router,
chCreator p2p.ChannelCreator,
logger log.Logger,
) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus")
@@ -282,20 +273,15 @@ func createConsensusReactor(
consensusState.SetPrivValidator(ctx, privValidator)
}
reactor, err := consensus.NewReactor(
ctx,
reactor := consensus.NewReactor(
logger,
consensusState,
router.OpenChannel,
peerManager.Subscribe(ctx),
chCreator,
peerManager.Subscribe,
eventBus,
waitSync,
csMetrics,
)
if err != nil {
return nil, nil, err
}
return reactor, consensusState, nil
}
@@ -375,7 +361,6 @@ func createPeerManager(
}
func createRouter(
ctx context.Context,
logger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfo types.NodeInfo,
@@ -405,7 +390,6 @@ func createRouter(
}
return p2p.NewRouter(
ctx,
p2pLogger,
p2pMetrics,
nodeInfo,
@@ -422,7 +406,7 @@ func makeNodeInfo(
nodeKey types.NodeKey,
eventSinks []indexer.EventSink,
genDoc *types.GenesisDoc,
state sm.State,
versionInfo version.Consensus,
) (types.NodeInfo, error) {
txIndexerStatus := "off"
@@ -434,8 +418,8 @@ func makeNodeInfo(
nodeInfo := types.NodeInfo{
ProtocolVersion: types.ProtocolVersion{
P2P: version.P2PProtocol, // global
Block: state.Version.Consensus.Block,
App: state.Version.Consensus.App,
Block: versionInfo.Block,
App: versionInfo.App,
},
NodeID: nodeKey.ID,
Network: genDoc.ChainID,