mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-10 06:57:24 +00:00
service: remove stop method and use contexts (#7292)
This commit is contained in:
138
node/node.go
138
node/node.go
@@ -82,7 +82,11 @@ type nodeImpl struct {
|
||||
// newDefaultNode returns a Tendermint node with default settings for the
|
||||
// PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
|
||||
// It implements NodeProvider.
|
||||
func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, error) {
|
||||
func newDefaultNode(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
logger log.Logger,
|
||||
) (service.Service, error) {
|
||||
nodeKey, err := types.LoadOrGenNodeKey(cfg.NodeKeyFile())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load or gen node key %s: %w", cfg.NodeKeyFile(), err)
|
||||
@@ -108,7 +112,9 @@ func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, err
|
||||
|
||||
appClient, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
|
||||
|
||||
return makeNode(cfg,
|
||||
return makeNode(
|
||||
ctx,
|
||||
cfg,
|
||||
pval,
|
||||
nodeKey,
|
||||
appClient,
|
||||
@@ -119,7 +125,9 @@ func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, err
|
||||
}
|
||||
|
||||
// makeNode returns a new, ready to go, Tendermint Node.
|
||||
func makeNode(cfg *config.Config,
|
||||
func makeNode(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
privValidator types.PrivValidator,
|
||||
nodeKey types.NodeKey,
|
||||
clientCreator abciclient.Creator,
|
||||
@@ -127,7 +135,10 @@ func makeNode(cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
logger log.Logger,
|
||||
) (service.Service, error) {
|
||||
closers := []closer{}
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithCancel(ctx)
|
||||
|
||||
closers := []closer{convertCancelCloser(cancel)}
|
||||
|
||||
blockStore, stateDB, dbCloser, err := initDBs(cfg, dbProvider)
|
||||
if err != nil {
|
||||
@@ -157,7 +168,7 @@ func makeNode(cfg *config.Config,
|
||||
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
|
||||
|
||||
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
|
||||
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, nodeMetrics.proxy)
|
||||
proxyApp, err := createAndStartProxyAppConns(ctx, clientCreator, logger, nodeMetrics.proxy)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
@@ -166,12 +177,13 @@ func makeNode(cfg *config.Config,
|
||||
// we might need to index the txs of the replayed block as this might not have happened
|
||||
// when the node stopped last time (i.e. the node stopped after it saved the block
|
||||
// but before it indexed the txs, or, endblocker panicked)
|
||||
eventBus, err := createAndStartEventBus(logger)
|
||||
eventBus, err := createAndStartEventBus(ctx, logger)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus,
|
||||
indexerService, eventSinks, err := createAndStartIndexerService(
|
||||
ctx, cfg, dbProvider, eventBus,
|
||||
logger, genDoc.ChainID, nodeMetrics.indexer)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
@@ -184,14 +196,19 @@ func makeNode(cfg *config.Config,
|
||||
// FIXME: we should start services inside OnStart
|
||||
switch protocol {
|
||||
case "grpc":
|
||||
privValidator, err = createAndStartPrivValidatorGRPCClient(cfg, genDoc.ChainID, logger)
|
||||
privValidator, err = createAndStartPrivValidatorGRPCClient(ctx, cfg, genDoc.ChainID, logger)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("error with private validator grpc client: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
default:
|
||||
privValidator, err = createAndStartPrivValidatorSocketClient(cfg.PrivValidator.ListenAddr, genDoc.ChainID, logger)
|
||||
privValidator, err = createAndStartPrivValidatorSocketClient(
|
||||
ctx,
|
||||
cfg.PrivValidator.ListenAddr,
|
||||
genDoc.ChainID,
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("error with private validator socket client: %w", err),
|
||||
@@ -201,7 +218,7 @@ func makeNode(cfg *config.Config,
|
||||
}
|
||||
var pubKey crypto.PubKey
|
||||
if cfg.Mode == config.ModeValidator {
|
||||
pubKey, err = privValidator.GetPubKey(context.TODO())
|
||||
pubKey, err = privValidator.GetPubKey(ctx)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(fmt.Errorf("can't get pubkey: %w", err),
|
||||
makeCloser(closers))
|
||||
@@ -227,7 +244,7 @@ func makeNode(cfg *config.Config,
|
||||
if err := consensus.NewHandshaker(
|
||||
logger.With("module", "handshaker"),
|
||||
stateStore, state, blockStore, eventBus, genDoc,
|
||||
).Handshake(proxyApp); err != nil {
|
||||
).Handshake(ctx, proxyApp); err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
@@ -253,7 +270,6 @@ func makeNode(cfg *config.Config,
|
||||
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
|
||||
}
|
||||
|
||||
peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
|
||||
@@ -492,7 +508,7 @@ func makeSeedNode(cfg *config.Config,
|
||||
}
|
||||
|
||||
// OnStart starts the Node. It implements service.Service.
|
||||
func (n *nodeImpl) OnStart() error {
|
||||
func (n *nodeImpl) OnStart(ctx context.Context) error {
|
||||
if n.config.RPC.PprofListenAddress != "" {
|
||||
// this service is not cleaned up (I believe that we'd
|
||||
// need to have another thread and a potentially a
|
||||
@@ -513,7 +529,7 @@ func (n *nodeImpl) OnStart() error {
|
||||
// Start the RPC server before the P2P server
|
||||
// so we can eg. receive txs for the first block
|
||||
if n.config.RPC.ListenAddress != "" && n.config.Mode != config.ModeSeed {
|
||||
listeners, err := n.startRPC()
|
||||
listeners, err := n.startRPC(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -526,39 +542,39 @@ func (n *nodeImpl) OnStart() error {
|
||||
}
|
||||
|
||||
// Start the transport.
|
||||
if err := n.router.Start(); err != nil {
|
||||
if err := n.router.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
n.isListening = true
|
||||
|
||||
if n.config.Mode != config.ModeSeed {
|
||||
if err := n.bcReactor.Start(); err != nil {
|
||||
if err := n.bcReactor.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the real consensus reactor separately since the switch uses the shim.
|
||||
if err := n.consensusReactor.Start(); err != nil {
|
||||
if err := n.consensusReactor.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the real state sync reactor separately since the switch uses the shim.
|
||||
if err := n.stateSyncReactor.Start(); err != nil {
|
||||
if err := n.stateSyncReactor.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the real mempool reactor separately since the switch uses the shim.
|
||||
if err := n.mempoolReactor.Start(); err != nil {
|
||||
if err := n.mempoolReactor.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the real evidence reactor separately since the switch uses the shim.
|
||||
if err := n.evidenceReactor.Start(); err != nil {
|
||||
if err := n.evidenceReactor.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if n.config.P2P.PexReactor {
|
||||
if err := n.pexReactor.Start(); err != nil {
|
||||
if err := n.pexReactor.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -591,7 +607,7 @@ func (n *nodeImpl) OnStart() error {
|
||||
// bubbling up the error and gracefully shutting down the rest of the node
|
||||
go func() {
|
||||
n.Logger.Info("starting state sync")
|
||||
state, err := n.stateSyncReactor.Sync(context.TODO())
|
||||
state, err := n.stateSyncReactor.Sync(ctx)
|
||||
if err != nil {
|
||||
n.Logger.Error("state sync failed; shutting down this node", "err", err)
|
||||
// stop the node
|
||||
@@ -617,7 +633,7 @@ func (n *nodeImpl) OnStart() error {
|
||||
// is running
|
||||
// FIXME Very ugly to have these metrics bleed through here.
|
||||
n.consensusReactor.SetBlockSyncingMetrics(1)
|
||||
if err := bcR.SwitchToBlockSync(state); err != nil {
|
||||
if err := bcR.SwitchToBlockSync(ctx, state); err != nil {
|
||||
n.Logger.Error("failed to switch to block sync", "err", err)
|
||||
return
|
||||
}
|
||||
@@ -638,19 +654,13 @@ func (n *nodeImpl) OnStart() error {
|
||||
|
||||
// OnStop stops the Node. It implements service.Service.
|
||||
func (n *nodeImpl) OnStop() {
|
||||
|
||||
n.Logger.Info("Stopping Node")
|
||||
|
||||
if n.eventBus != nil {
|
||||
// first stop the non-reactor services
|
||||
if err := n.eventBus.Stop(); err != nil {
|
||||
n.Logger.Error("Error closing eventBus", "err", err)
|
||||
}
|
||||
n.eventBus.Wait()
|
||||
}
|
||||
if n.indexerService != nil {
|
||||
if err := n.indexerService.Stop(); err != nil {
|
||||
n.Logger.Error("Error closing indexerService", "err", err)
|
||||
}
|
||||
n.indexerService.Wait()
|
||||
}
|
||||
|
||||
for _, es := range n.eventSinks {
|
||||
@@ -660,41 +670,14 @@ func (n *nodeImpl) OnStop() {
|
||||
}
|
||||
|
||||
if n.config.Mode != config.ModeSeed {
|
||||
// now stop the reactors
|
||||
|
||||
// Stop the real blockchain reactor separately since the switch uses the shim.
|
||||
if err := n.bcReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
|
||||
}
|
||||
|
||||
// Stop the real consensus reactor separately since the switch uses the shim.
|
||||
if err := n.consensusReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the consensus reactor", "err", err)
|
||||
}
|
||||
|
||||
// Stop the real state sync reactor separately since the switch uses the shim.
|
||||
if err := n.stateSyncReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the state sync reactor", "err", err)
|
||||
}
|
||||
|
||||
// Stop the real mempool reactor separately since the switch uses the shim.
|
||||
if err := n.mempoolReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the mempool reactor", "err", err)
|
||||
}
|
||||
|
||||
// Stop the real evidence reactor separately since the switch uses the shim.
|
||||
if err := n.evidenceReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the evidence reactor", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := n.pexReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
|
||||
}
|
||||
|
||||
if err := n.router.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop router", "err", err)
|
||||
n.bcReactor.Wait()
|
||||
n.consensusReactor.Wait()
|
||||
n.stateSyncReactor.Wait()
|
||||
n.mempoolReactor.Wait()
|
||||
n.evidenceReactor.Wait()
|
||||
}
|
||||
n.pexReactor.Wait()
|
||||
n.router.Wait()
|
||||
n.isListening = false
|
||||
|
||||
// finally stop the listeners / external services
|
||||
@@ -706,9 +689,7 @@ func (n *nodeImpl) OnStop() {
|
||||
}
|
||||
|
||||
if pvsc, ok := n.privValidator.(service.Service); ok {
|
||||
if err := pvsc.Stop(); err != nil {
|
||||
n.Logger.Error("Error closing private validator", "err", err)
|
||||
}
|
||||
pvsc.Wait()
|
||||
}
|
||||
|
||||
if n.prometheusSrv != nil {
|
||||
@@ -719,13 +700,15 @@ func (n *nodeImpl) OnStop() {
|
||||
|
||||
}
|
||||
if err := n.shutdownOps(); err != nil {
|
||||
n.Logger.Error("problem shutting down additional services", "err", err)
|
||||
if strings.TrimSpace(err.Error()) != "" {
|
||||
n.Logger.Error("problem shutting down additional services", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
func (n *nodeImpl) startRPC(ctx context.Context) ([]net.Listener, error) {
|
||||
if n.config.Mode == config.ModeValidator {
|
||||
pubKey, err := n.privValidator.GetPubKey(context.TODO())
|
||||
pubKey, err := n.privValidator.GetPubKey(ctx)
|
||||
if pubKey == nil || err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
@@ -970,8 +953,8 @@ func loadStateFromDBOrGenesisDocProvider(
|
||||
}
|
||||
|
||||
func createAndStartPrivValidatorSocketClient(
|
||||
listenAddr,
|
||||
chainID string,
|
||||
ctx context.Context,
|
||||
listenAddr, chainID string,
|
||||
logger log.Logger,
|
||||
) (types.PrivValidator, error) {
|
||||
|
||||
@@ -980,13 +963,13 @@ func createAndStartPrivValidatorSocketClient(
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
|
||||
pvsc, err := privval.NewSignerClient(pve, chainID)
|
||||
pvsc, err := privval.NewSignerClient(ctx, pve, chainID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
|
||||
// try to get a pubkey from private validate first time
|
||||
_, err = pvsc.GetPubKey(context.TODO())
|
||||
_, err = pvsc.GetPubKey(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
@@ -1001,6 +984,7 @@ func createAndStartPrivValidatorSocketClient(
|
||||
}
|
||||
|
||||
func createAndStartPrivValidatorGRPCClient(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
chainID string,
|
||||
logger log.Logger,
|
||||
@@ -1016,7 +1000,7 @@ func createAndStartPrivValidatorGRPCClient(
|
||||
}
|
||||
|
||||
// try to get a pubkey from private validate first time
|
||||
_, err = pvsc.GetPubKey(context.TODO())
|
||||
_, err = pvsc.GetPubKey(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
@@ -1031,7 +1015,7 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt
|
||||
|
||||
if conf.FilterPeers && proxyApp != nil {
|
||||
opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error {
|
||||
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
|
||||
res, err := proxyApp.Query().QuerySync(ctx, abci.RequestQuery{
|
||||
Path: fmt.Sprintf("/p2p/filter/id/%s", id),
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"math"
|
||||
"net"
|
||||
"os"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -43,14 +42,17 @@ func TestNodeStartStop(t *testing.T) {
|
||||
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
|
||||
ctx, bcancel := context.WithCancel(context.Background())
|
||||
defer bcancel()
|
||||
|
||||
// create & start node
|
||||
ns, err := newDefaultNode(cfg, log.TestingLogger())
|
||||
ns, err := newDefaultNode(ctx, cfg, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, ns.Start())
|
||||
require.NoError(t, ns.Start(ctx))
|
||||
|
||||
t.Cleanup(func() {
|
||||
if ns.IsRunning() {
|
||||
assert.NoError(t, ns.Stop())
|
||||
bcancel()
|
||||
ns.Wait()
|
||||
}
|
||||
})
|
||||
@@ -58,9 +60,6 @@ func TestNodeStartStop(t *testing.T) {
|
||||
n, ok := ns.(*nodeImpl)
|
||||
require.True(t, ok)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// wait for the node to produce a block
|
||||
blocksSub, err := n.EventBus().SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
|
||||
ClientID: "node_test",
|
||||
@@ -75,35 +74,35 @@ func TestNodeStartStop(t *testing.T) {
|
||||
|
||||
// stop the node
|
||||
go func() {
|
||||
err = n.Stop()
|
||||
require.NoError(t, err)
|
||||
bcancel()
|
||||
n.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-n.Quit():
|
||||
case <-time.After(5 * time.Second):
|
||||
pid := os.Getpid()
|
||||
p, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
case <-time.After(10 * time.Second):
|
||||
if n.IsRunning() {
|
||||
t.Fatal("timed out waiting for shutdown")
|
||||
}
|
||||
err = p.Signal(syscall.SIGABRT)
|
||||
fmt.Println(err)
|
||||
t.Fatal("timed out waiting for shutdown")
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func getTestNode(t *testing.T, conf *config.Config, logger log.Logger) *nodeImpl {
|
||||
func getTestNode(ctx context.Context, t *testing.T, conf *config.Config, logger log.Logger) *nodeImpl {
|
||||
t.Helper()
|
||||
ns, err := newDefaultNode(conf, logger)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
ns, err := newDefaultNode(ctx, conf, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
n, ok := ns.(*nodeImpl)
|
||||
require.True(t, ok)
|
||||
|
||||
t.Cleanup(func() {
|
||||
if ns.IsRunning() {
|
||||
assert.NoError(t, ns.Stop())
|
||||
cancel()
|
||||
if n.IsRunning() {
|
||||
ns.Wait()
|
||||
}
|
||||
})
|
||||
@@ -118,11 +117,14 @@ func TestNodeDelayedStart(t *testing.T) {
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
now := tmtime.Now()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// create & start node
|
||||
n := getTestNode(t, cfg, log.TestingLogger())
|
||||
n := getTestNode(ctx, t, cfg, log.TestingLogger())
|
||||
n.GenesisDoc().GenesisTime = now.Add(2 * time.Second)
|
||||
|
||||
require.NoError(t, n.Start())
|
||||
require.NoError(t, n.Start(ctx))
|
||||
|
||||
startTime := tmtime.Now()
|
||||
assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime))
|
||||
@@ -133,8 +135,11 @@ func TestNodeSetAppVersion(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// create node
|
||||
n := getTestNode(t, cfg, log.TestingLogger())
|
||||
n := getTestNode(ctx, t, cfg, log.TestingLogger())
|
||||
|
||||
// default config uses the kvstore app
|
||||
appVersion := kvstore.ProtocolVersion
|
||||
@@ -151,6 +156,9 @@ func TestNodeSetAppVersion(t *testing.T) {
|
||||
func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
addr := "tcp://" + testFreeAddr(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
cfg, err := config.ResetTestRoot("node_priv_val_tcp_test")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
@@ -170,31 +178,34 @@ func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
)
|
||||
|
||||
go func() {
|
||||
err := signerServer.Start()
|
||||
err := signerServer.Start(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
defer signerServer.Stop() //nolint:errcheck // ignore for tests
|
||||
|
||||
n := getTestNode(t, cfg, log.TestingLogger())
|
||||
n := getTestNode(ctx, t, cfg, log.TestingLogger())
|
||||
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
// address without a protocol must result in error
|
||||
func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
addrNoPrefix := testFreeAddr(t)
|
||||
|
||||
cfg, err := config.ResetTestRoot("node_priv_val_tcp_test")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cfg.PrivValidator.ListenAddr = addrNoPrefix
|
||||
n, err := newDefaultNode(ctx, cfg, log.TestingLogger())
|
||||
|
||||
n, err := newDefaultNode(cfg, log.TestingLogger())
|
||||
assert.Error(t, err)
|
||||
|
||||
if n != nil && n.IsRunning() {
|
||||
assert.NoError(t, n.Stop())
|
||||
cancel()
|
||||
n.Wait()
|
||||
}
|
||||
}
|
||||
@@ -203,6 +214,9 @@ func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
tmpfile := "/tmp/kms." + tmrand.Str(6) + ".sock"
|
||||
defer os.Remove(tmpfile) // clean up
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
cfg, err := config.ResetTestRoot("node_priv_val_tcp_test")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
@@ -222,11 +236,11 @@ func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
)
|
||||
|
||||
go func() {
|
||||
err := pvsc.Start()
|
||||
err := pvsc.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
defer pvsc.Stop() //nolint:errcheck // ignore for tests
|
||||
n := getTestNode(t, cfg, log.TestingLogger())
|
||||
n := getTestNode(ctx, t, cfg, log.TestingLogger())
|
||||
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
@@ -248,11 +262,11 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
cfg, err := config.ResetTestRoot("node_create_proposal")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
|
||||
err = proxyApp.Start()
|
||||
err = proxyApp.Start(ctx)
|
||||
require.Nil(t, err)
|
||||
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
|
||||
|
||||
logger := log.TestingLogger()
|
||||
|
||||
@@ -344,9 +358,8 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
|
||||
err = proxyApp.Start()
|
||||
err = proxyApp.Start(ctx)
|
||||
require.Nil(t, err)
|
||||
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
|
||||
|
||||
logger := log.TestingLogger()
|
||||
|
||||
@@ -408,9 +421,8 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
|
||||
err = proxyApp.Start()
|
||||
err = proxyApp.Start(ctx)
|
||||
require.Nil(t, err)
|
||||
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
|
||||
|
||||
logger := log.TestingLogger()
|
||||
|
||||
@@ -432,7 +444,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
// fill the mempool with one txs just below the maximum size
|
||||
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, types.MaxVotesCount))
|
||||
tx := tmrand.Bytes(txLength - 6) // to account for the varint
|
||||
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
|
||||
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
|
||||
assert.NoError(t, err)
|
||||
// now produce more txs than what a normal block can hold with 10 smaller txs
|
||||
// At the end of the test, only the single big tx should be added
|
||||
@@ -521,6 +533,9 @@ func TestNodeNewSeedNode(t *testing.T) {
|
||||
cfg.Mode = config.ModeSeed
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
nodeKey, err := types.LoadOrGenNodeKey(cfg.NodeKeyFile())
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -530,17 +545,20 @@ func TestNodeNewSeedNode(t *testing.T) {
|
||||
defaultGenesisDocProviderFunc(cfg),
|
||||
log.TestingLogger(),
|
||||
)
|
||||
t.Cleanup(ns.Wait)
|
||||
|
||||
require.NoError(t, err)
|
||||
n, ok := ns.(*nodeImpl)
|
||||
require.True(t, ok)
|
||||
|
||||
err = n.Start()
|
||||
err = n.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, n.pexReactor.IsRunning())
|
||||
|
||||
require.NoError(t, n.Stop())
|
||||
cancel()
|
||||
n.Wait()
|
||||
|
||||
assert.False(t, n.pexReactor.IsRunning())
|
||||
}
|
||||
|
||||
func TestNodeSetEventSink(t *testing.T) {
|
||||
@@ -549,19 +567,22 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
logger := log.TestingLogger()
|
||||
setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink {
|
||||
eventBus, err := createAndStartEventBus(logger)
|
||||
eventBus, err := createAndStartEventBus(ctx, logger)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, eventBus.Stop()) })
|
||||
t.Cleanup(eventBus.Wait)
|
||||
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
|
||||
indexService, eventSinks, err := createAndStartIndexerService(cfg,
|
||||
indexService, eventSinks, err := createAndStartIndexerService(ctx, cfg,
|
||||
config.DefaultDBProvider, eventBus, logger, genDoc.ChainID,
|
||||
indexer.NopMetrics())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
|
||||
t.Cleanup(indexService.Wait)
|
||||
return eventSinks
|
||||
}
|
||||
cleanup := func(ns service.Service) func() {
|
||||
@@ -576,7 +597,7 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
if !n.IsRunning() {
|
||||
return
|
||||
}
|
||||
assert.NoError(t, n.Stop())
|
||||
cancel()
|
||||
n.Wait()
|
||||
}
|
||||
}
|
||||
@@ -598,7 +619,7 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
|
||||
|
||||
cfg.TxIndex.Indexer = []string{"kvv"}
|
||||
ns, err := newDefaultNode(cfg, logger)
|
||||
ns, err := newDefaultNode(ctx, cfg, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Contains(t, err.Error(), "unsupported event sink type")
|
||||
t.Cleanup(cleanup(ns))
|
||||
@@ -610,7 +631,7 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
|
||||
|
||||
cfg.TxIndex.Indexer = []string{"psql"}
|
||||
ns, err = newDefaultNode(cfg, logger)
|
||||
ns, err = newDefaultNode(ctx, cfg, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Contains(t, err.Error(), "the psql connection settings cannot be empty")
|
||||
t.Cleanup(cleanup(ns))
|
||||
@@ -652,14 +673,14 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
|
||||
cfg.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
|
||||
cfg.TxIndex.PsqlConn = psqlConn
|
||||
ns, err = newDefaultNode(cfg, logger)
|
||||
ns, err = newDefaultNode(ctx, cfg, logger)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), e.Error())
|
||||
t.Cleanup(cleanup(ns))
|
||||
|
||||
cfg.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
|
||||
cfg.TxIndex.PsqlConn = psqlConn
|
||||
ns, err = newDefaultNode(cfg, logger)
|
||||
ns, err = newDefaultNode(ctx, cfg, logger)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), e.Error())
|
||||
t.Cleanup(cleanup(ns))
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
@@ -16,8 +17,12 @@ import (
|
||||
// process that host their own process-local tendermint node. This is
|
||||
// equivalent to running tendermint in it's own process communicating
|
||||
// to an external ABCI application.
|
||||
func NewDefault(conf *config.Config, logger log.Logger) (service.Service, error) {
|
||||
return newDefaultNode(conf, logger)
|
||||
func NewDefault(
|
||||
ctx context.Context,
|
||||
conf *config.Config,
|
||||
logger log.Logger,
|
||||
) (service.Service, error) {
|
||||
return newDefaultNode(ctx, conf, logger)
|
||||
}
|
||||
|
||||
// New constructs a tendermint node. The ClientCreator makes it
|
||||
@@ -26,7 +31,9 @@ func NewDefault(conf *config.Config, logger log.Logger) (service.Service, error)
|
||||
// Genesis document: if the value is nil, the genesis document is read
|
||||
// from the file specified in the config, and otherwise the node uses
|
||||
// value of the final argument.
|
||||
func New(conf *config.Config,
|
||||
func New(
|
||||
ctx context.Context,
|
||||
conf *config.Config,
|
||||
logger log.Logger,
|
||||
cf abciclient.Creator,
|
||||
gen *types.GenesisDoc,
|
||||
@@ -51,7 +58,9 @@ func New(conf *config.Config,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return makeNode(conf,
|
||||
return makeNode(
|
||||
ctx,
|
||||
conf,
|
||||
pval,
|
||||
nodeKey,
|
||||
cf,
|
||||
|
||||
@@ -2,6 +2,7 @@ package node
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
@@ -52,6 +53,10 @@ func makeCloser(cs []closer) closer {
|
||||
}
|
||||
}
|
||||
|
||||
func convertCancelCloser(cancel context.CancelFunc) closer {
|
||||
return func() error { cancel(); return nil }
|
||||
}
|
||||
|
||||
func combineCloseError(err error, cl closer) error {
|
||||
if err == nil {
|
||||
return cl()
|
||||
@@ -88,26 +93,31 @@ func initDBs(
|
||||
return blockStore, stateDB, makeCloser(closers), nil
|
||||
}
|
||||
|
||||
// nolint:lll
|
||||
func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
|
||||
func createAndStartProxyAppConns(
|
||||
ctx context.Context,
|
||||
clientCreator abciclient.Creator,
|
||||
logger log.Logger,
|
||||
metrics *proxy.Metrics,
|
||||
) (proxy.AppConns, error) {
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), metrics)
|
||||
|
||||
if err := proxyApp.Start(); err != nil {
|
||||
if err := proxyApp.Start(ctx); err != nil {
|
||||
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
|
||||
}
|
||||
|
||||
return proxyApp, nil
|
||||
}
|
||||
|
||||
func createAndStartEventBus(logger log.Logger) (*eventbus.EventBus, error) {
|
||||
func createAndStartEventBus(ctx context.Context, logger log.Logger) (*eventbus.EventBus, error) {
|
||||
eventBus := eventbus.NewDefault(logger.With("module", "events"))
|
||||
if err := eventBus.Start(); err != nil {
|
||||
if err := eventBus.Start(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return eventBus, nil
|
||||
}
|
||||
|
||||
func createAndStartIndexerService(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
eventBus *eventbus.EventBus,
|
||||
@@ -127,7 +137,7 @@ func createAndStartIndexerService(
|
||||
Metrics: metrics,
|
||||
})
|
||||
|
||||
if err := indexerService.Start(); err != nil {
|
||||
if err := indexerService.Start(ctx); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user