Merge remote-tracking branch 'origin/master' into wb/proposer-based-timestamps

This commit is contained in:
William Banfield
2021-11-23 18:06:46 -05:00
583 changed files with 16589 additions and 48821 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -7,115 +7,142 @@ import (
"math"
"net"
"os"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/libs/service"
tmtime "github.com/tendermint/tendermint/libs/time"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
func TestNodeStartStop(t *testing.T) {
config := cfg.ResetTestRoot("node_node_test")
cfg, err := config.ResetTestRoot("node_node_test")
require.NoError(t, err)
defer os.RemoveAll(config.RootDir)
defer os.RemoveAll(cfg.RootDir)
ctx, bcancel := context.WithCancel(context.Background())
defer bcancel()
// create & start node
ns, err := newDefaultNode(config, log.TestingLogger())
ns, err := newDefaultNode(ctx, cfg, log.TestingLogger())
require.NoError(t, err)
require.NoError(t, ns.Start())
require.NoError(t, ns.Start(ctx))
t.Cleanup(func() {
if ns.IsRunning() {
bcancel()
ns.Wait()
}
})
n, ok := ns.(*nodeImpl)
require.True(t, ok)
// wait for the node to produce a block
blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
blocksSub, err := n.EventBus().SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
ClientID: "node_test",
Query: types.EventQueryNewBlock,
})
require.NoError(t, err)
select {
case <-blocksSub.Out():
case <-blocksSub.Canceled():
t.Fatal("blocksSub was canceled")
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for the node to produce a block")
tctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if _, err := blocksSub.Next(tctx); err != nil {
t.Fatalf("Waiting for event: %v", err)
}
// stop the node
go func() {
err = n.Stop()
require.NoError(t, err)
bcancel()
n.Wait()
}()
select {
case <-n.Quit():
case <-time.After(5 * time.Second):
pid := os.Getpid()
p, err := os.FindProcess(pid)
if err != nil {
panic(err)
return
case <-time.After(10 * time.Second):
if n.IsRunning() {
t.Fatal("timed out waiting for shutdown")
}
err = p.Signal(syscall.SIGABRT)
fmt.Println(err)
t.Fatal("timed out waiting for shutdown")
}
}
func getTestNode(t *testing.T, conf *cfg.Config, logger log.Logger) *nodeImpl {
func getTestNode(ctx context.Context, t *testing.T, conf *config.Config, logger log.Logger) *nodeImpl {
t.Helper()
ns, err := newDefaultNode(conf, logger)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ns, err := newDefaultNode(ctx, conf, logger)
require.NoError(t, err)
n, ok := ns.(*nodeImpl)
require.True(t, ok)
t.Cleanup(func() {
cancel()
if n.IsRunning() {
ns.Wait()
}
})
return n
}
func TestNodeDelayedStart(t *testing.T) {
config := cfg.ResetTestRoot("node_delayed_start_test")
defer os.RemoveAll(config.RootDir)
cfg, err := config.ResetTestRoot("node_delayed_start_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
now := tmtime.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create & start node
n := getTestNode(t, config, log.TestingLogger())
n := getTestNode(ctx, t, cfg, log.TestingLogger())
n.GenesisDoc().GenesisTime = now.Add(2 * time.Second)
require.NoError(t, n.Start())
defer n.Stop() //nolint:errcheck // ignore for tests
require.NoError(t, n.Start(ctx))
startTime := tmtime.Now()
assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime))
}
func TestNodeSetAppVersion(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)
cfg, err := config.ResetTestRoot("node_app_version_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create node
n := getTestNode(t, config, log.TestingLogger())
n := getTestNode(ctx, t, cfg, log.TestingLogger())
// default config uses the kvstore app
var appVersion uint64 = kvstore.ProtocolVersion
appVersion := kvstore.ProtocolVersion
// check version is set in state
state, err := n.stateStore.Load()
@@ -129,9 +156,13 @@ func TestNodeSetAppVersion(t *testing.T) {
func TestNodeSetPrivValTCP(t *testing.T) {
addr := "tcp://" + testFreeAddr(t)
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
defer os.RemoveAll(config.RootDir)
config.PrivValidator.ListenAddr = addr
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg, err := config.ResetTestRoot("node_priv_val_tcp_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
cfg.PrivValidator.ListenAddr = addr
dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey())
dialerEndpoint := privval.NewSignerDialerEndpoint(
@@ -142,41 +173,54 @@ func TestNodeSetPrivValTCP(t *testing.T) {
signerServer := privval.NewSignerServer(
dialerEndpoint,
config.ChainID(),
cfg.ChainID(),
types.NewMockPV(),
)
go func() {
err := signerServer.Start()
err := signerServer.Start(ctx)
if err != nil {
panic(err)
}
}()
defer signerServer.Stop() //nolint:errcheck // ignore for tests
n := getTestNode(t, config, log.TestingLogger())
n := getTestNode(ctx, t, cfg, log.TestingLogger())
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
}
// address without a protocol must result in error
func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
addrNoPrefix := testFreeAddr(t)
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
defer os.RemoveAll(config.RootDir)
config.PrivValidator.ListenAddr = addrNoPrefix
cfg, err := config.ResetTestRoot("node_priv_val_tcp_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
cfg.PrivValidator.ListenAddr = addrNoPrefix
n, err := newDefaultNode(ctx, cfg, log.TestingLogger())
_, err := newDefaultNode(config, log.TestingLogger())
assert.Error(t, err)
if n != nil && n.IsRunning() {
cancel()
n.Wait()
}
}
func TestNodeSetPrivValIPC(t *testing.T) {
tmpfile := "/tmp/kms." + tmrand.Str(6) + ".sock"
defer os.Remove(tmpfile) // clean up
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
defer os.RemoveAll(config.RootDir)
config.PrivValidator.ListenAddr = "unix://" + tmpfile
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg, err := config.ResetTestRoot("node_priv_val_tcp_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
cfg.PrivValidator.ListenAddr = "unix://" + tmpfile
dialer := privval.DialUnixFn(tmpfile)
dialerEndpoint := privval.NewSignerDialerEndpoint(
@@ -187,16 +231,16 @@ func TestNodeSetPrivValIPC(t *testing.T) {
pvsc := privval.NewSignerServer(
dialerEndpoint,
config.ChainID(),
cfg.ChainID(),
types.NewMockPV(),
)
go func() {
err := pvsc.Start()
err := pvsc.Start(ctx)
require.NoError(t, err)
}()
defer pvsc.Stop() //nolint:errcheck // ignore for tests
n := getTestNode(t, config, log.TestingLogger())
n := getTestNode(ctx, t, cfg, log.TestingLogger())
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
}
@@ -212,18 +256,22 @@ func testFreeAddr(t *testing.T) string {
// create a proposal block using real and full
// mempool and evidence pool and validate it.
func TestCreateProposalBlock(t *testing.T) {
config := cfg.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(config.RootDir)
cc := proxy.NewLocalClientCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg, err := config.ResetTestRoot("node_create_proposal")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
err = proxyApp.Start(ctx)
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
logger := log.TestingLogger()
const height int64 = 1
state, stateDB, privVals := state(1, height)
state, stateDB, privVals := state(t, 1, height)
stateStore := sm.NewStore(stateDB)
maxBytes := 16384
const partSize uint32 = 256
@@ -232,15 +280,12 @@ func TestCreateProposalBlock(t *testing.T) {
state.ConsensusParams.Evidence.MaxBytes = maxEvidenceBytes
proposerAddr, _ := state.Validators.GetByIndex(0)
mp := mempoolv0.NewCListMempool(
config.Mempool,
mp := mempool.NewTxMempool(
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(mempool.NopMetrics()),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
)
mp.SetLogger(logger)
// Make EvidencePool
evidenceDB := dbm.NewMemDB()
@@ -250,7 +295,7 @@ func TestCreateProposalBlock(t *testing.T) {
// fill the evidence pool with more evidence
// than can fit in a block
var currentBytes int64 = 0
var currentBytes int64
for currentBytes <= maxEvidenceBytes {
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, time.Now(), privVals[0], "test-chain")
currentBytes += int64(len(ev.Bytes()))
@@ -267,7 +312,7 @@ func TestCreateProposalBlock(t *testing.T) {
txLength := 100
for i := 0; i <= maxBytes/txLength; i++ {
tx := tmrand.Bytes(txLength)
err := mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err := mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
}
@@ -304,18 +349,22 @@ func TestCreateProposalBlock(t *testing.T) {
}
func TestMaxTxsProposalBlockSize(t *testing.T) {
config := cfg.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(config.RootDir)
cc := proxy.NewLocalClientCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg, err := config.ResetTestRoot("node_create_proposal")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
err = proxyApp.Start(ctx)
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
logger := log.TestingLogger()
const height int64 = 1
state, stateDB, _ := state(1, height)
state, stateDB, _ := state(t, 1, height)
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
const maxBytes int64 = 16384
@@ -324,20 +373,18 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
proposerAddr, _ := state.Validators.GetByIndex(0)
// Make Mempool
mp := mempoolv0.NewCListMempool(
config.Mempool,
mp := mempool.NewTxMempool(
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(mempool.NopMetrics()),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
)
mp.SetLogger(logger)
// fill the mempool with one txs just below the maximum size
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1))
tx := tmrand.Bytes(txLength - 4) // to account for the varint
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
blockExec := sm.NewBlockExecutor(
@@ -366,17 +413,20 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
}
func TestMaxProposalBlockSize(t *testing.T) {
config := cfg.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(config.RootDir)
cc := proxy.NewLocalClientCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg, err := config.ResetTestRoot("node_create_proposal")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
err = proxyApp.Start(ctx)
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
logger := log.TestingLogger()
state, stateDB, _ := state(types.MaxVotesCount, int64(1))
state, stateDB, _ := state(t, types.MaxVotesCount, int64(1))
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
const maxBytes int64 = 1024 * 1024 * 2
@@ -384,26 +434,23 @@ func TestMaxProposalBlockSize(t *testing.T) {
proposerAddr, _ := state.Validators.GetByIndex(0)
// Make Mempool
mp := mempoolv0.NewCListMempool(
config.Mempool,
mp := mempool.NewTxMempool(
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(mempool.NopMetrics()),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
)
mp.SetLogger(logger)
// fill the mempool with one txs just below the maximum size
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, types.MaxVotesCount))
tx := tmrand.Bytes(txLength - 6) // to account for the varint
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
// now produce more txs than what a normal block can hold with 10 smaller txs
// At the end of the test, only the single big tx should be added
for i := 0; i < 10; i++ {
tx := tmrand.Bytes(10)
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
}
@@ -481,92 +528,126 @@ func TestMaxProposalBlockSize(t *testing.T) {
}
func TestNodeNewSeedNode(t *testing.T) {
config := cfg.ResetTestRoot("node_new_node_custom_reactors_test")
config.Mode = cfg.ModeSeed
defer os.RemoveAll(config.RootDir)
cfg, err := config.ResetTestRoot("node_new_node_custom_reactors_test")
require.NoError(t, err)
cfg.Mode = config.ModeSeed
defer os.RemoveAll(cfg.RootDir)
nodeKey, err := types.LoadOrGenNodeKey(config.NodeKeyFile())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nodeKey, err := types.LoadOrGenNodeKey(cfg.NodeKeyFile())
require.NoError(t, err)
ns, err := makeSeedNode(config,
cfg.DefaultDBProvider,
ns, err := makeSeedNode(cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(config),
defaultGenesisDocProviderFunc(cfg),
log.TestingLogger(),
)
t.Cleanup(ns.Wait)
require.NoError(t, err)
n, ok := ns.(*nodeImpl)
require.True(t, ok)
err = n.Start()
err = n.Start(ctx)
require.NoError(t, err)
assert.True(t, n.pexReactor.IsRunning())
cancel()
n.Wait()
assert.False(t, n.pexReactor.IsRunning())
}
func TestNodeSetEventSink(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)
cfg, err := config.ResetTestRoot("node_app_version_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
setupTest := func(t *testing.T, conf *cfg.Config) []indexer.EventSink {
eventBus, err := createAndStartEventBus(logger)
setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink {
eventBus, err := createAndStartEventBus(ctx, logger)
require.NoError(t, err)
t.Cleanup(eventBus.Wait)
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
indexService, eventSinks, err := createAndStartIndexerService(ctx, cfg,
config.DefaultDBProvider, eventBus, logger, genDoc.ChainID,
indexer.NopMetrics())
require.NoError(t, err)
indexService, eventSinks, err := createAndStartIndexerService(config,
cfg.DefaultDBProvider, eventBus, logger, genDoc.ChainID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
t.Cleanup(indexService.Wait)
return eventSinks
}
cleanup := func(ns service.Service) func() {
return func() {
n, ok := ns.(*nodeImpl)
if !ok {
return
}
if n == nil {
return
}
if !n.IsRunning() {
return
}
cancel()
n.Wait()
}
}
eventSinks := setupTest(t, config)
eventSinks := setupTest(t, cfg)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.KV, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"null"}
eventSinks = setupTest(t, config)
cfg.TxIndex.Indexer = []string{"null"}
eventSinks = setupTest(t, cfg)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"null", "kv"}
eventSinks = setupTest(t, config)
cfg.TxIndex.Indexer = []string{"null", "kv"}
eventSinks = setupTest(t, cfg)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"kvv"}
ns, err := newDefaultNode(config, logger)
cfg.TxIndex.Indexer = []string{"kvv"}
ns, err := newDefaultNode(ctx, cfg, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("unsupported event sink type"), err)
assert.Contains(t, err.Error(), "unsupported event sink type")
t.Cleanup(cleanup(ns))
config.TxIndex.Indexer = []string{}
eventSinks = setupTest(t, config)
cfg.TxIndex.Indexer = []string{}
eventSinks = setupTest(t, cfg)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"psql"}
ns, err = newDefaultNode(config, logger)
cfg.TxIndex.Indexer = []string{"psql"}
ns, err = newDefaultNode(ctx, cfg, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
assert.Contains(t, err.Error(), "the psql connection settings cannot be empty")
t.Cleanup(cleanup(ns))
var psqlConn = "test"
config.TxIndex.Indexer = []string{"psql"}
config.TxIndex.PsqlConn = psqlConn
eventSinks = setupTest(t, config)
cfg.TxIndex.Indexer = []string{"psql"}
cfg.TxIndex.PsqlConn = psqlConn
eventSinks = setupTest(t, cfg)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"psql", "kv"}
config.TxIndex.PsqlConn = psqlConn
eventSinks = setupTest(t, config)
cfg.TxIndex.Indexer = []string{"psql", "kv"}
cfg.TxIndex.PsqlConn = psqlConn
eventSinks = setupTest(t, cfg)
assert.Equal(t, 2, len(eventSinks))
// we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks.
@@ -577,9 +658,9 @@ func TestNodeSetEventSink(t *testing.T) {
assert.Equal(t, indexer.KV, eventSinks[1].Type())
}
config.TxIndex.Indexer = []string{"kv", "psql"}
config.TxIndex.PsqlConn = psqlConn
eventSinks = setupTest(t, config)
cfg.TxIndex.Indexer = []string{"kv", "psql"}
cfg.TxIndex.PsqlConn = psqlConn
eventSinks = setupTest(t, cfg)
assert.Equal(t, 2, len(eventSinks))
if eventSinks[0].Type() == indexer.KV {
@@ -590,20 +671,23 @@ func TestNodeSetEventSink(t *testing.T) {
}
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
config.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
config.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(config, logger)
cfg.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
cfg.TxIndex.PsqlConn = psqlConn
ns, err = newDefaultNode(ctx, cfg, logger)
require.Error(t, err)
assert.Equal(t, e, err)
assert.Contains(t, err.Error(), e.Error())
t.Cleanup(cleanup(ns))
config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
config.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(config, logger)
cfg.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
cfg.TxIndex.PsqlConn = psqlConn
ns, err = newDefaultNode(ctx, cfg, logger)
require.Error(t, err)
assert.Equal(t, e, err)
assert.Contains(t, err.Error(), e.Error())
t.Cleanup(cleanup(ns))
}
func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
func state(t *testing.T, nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
t.Helper()
privVals := make([]types.PrivValidator, nVals)
vals := make([]types.GenesisValidator, nVals)
for i := 0; i < nVals; i++ {
@@ -624,17 +708,15 @@ func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
// save validators to db for 2 heights
stateDB := dbm.NewMemDB()
t.Cleanup(func() { require.NoError(t, stateDB.Close()) })
stateStore := sm.NewStore(stateDB)
if err := stateStore.Save(s); err != nil {
panic(err)
}
require.NoError(t, stateStore.Save(s))
for i := 1; i < int(height); i++ {
s.LastBlockHeight++
s.LastValidators = s.Validators.Copy()
if err := stateStore.Save(s); err != nil {
panic(err)
}
require.NoError(t, stateStore.Save(s))
}
return s, stateDB, privVals
}
@@ -648,14 +730,15 @@ func loadStatefromGenesis(t *testing.T) sm.State {
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
config := cfg.ResetTestRoot("load_state_from_genesis")
cfg, err := config.ResetTestRoot("load_state_from_genesis")
require.NoError(t, err)
loadedState, err := stateStore.Load()
require.NoError(t, err)
require.True(t, loadedState.IsEmpty())
valSet, _ := factory.ValidatorSet(0, 10)
genDoc := factory.GenesisDoc(config, time.Now(), valSet.Validators, nil)
genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, nil)
state, err := loadStateFromDBOrGenesisDocProvider(
stateStore,

View File

@@ -2,13 +2,14 @@
package node
import (
"context"
"fmt"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@@ -16,8 +17,12 @@ import (
// process that host their own process-local tendermint node. This is
// equivalent to running tendermint in it's own process communicating
// to an external ABCI application.
func NewDefault(conf *config.Config, logger log.Logger) (service.Service, error) {
return newDefaultNode(conf, logger)
func NewDefault(
ctx context.Context,
conf *config.Config,
logger log.Logger,
) (service.Service, error) {
return newDefaultNode(ctx, conf, logger)
}
// New constructs a tendermint node. The ClientCreator makes it
@@ -26,9 +31,11 @@ func NewDefault(conf *config.Config, logger log.Logger) (service.Service, error)
// Genesis document: if the value is nil, the genesis document is read
// from the file specified in the config, and otherwise the node uses
// value of the final argument.
func New(conf *config.Config,
func New(
ctx context.Context,
conf *config.Config,
logger log.Logger,
cf proxy.ClientCreator,
cf abciclient.Creator,
gen *types.GenesisDoc,
) (service.Service, error) {
nodeKey, err := types.LoadOrGenNodeKey(conf.NodeKeyFile())
@@ -51,7 +58,9 @@ func New(conf *config.Config,
return nil, err
}
return makeNode(conf,
return makeNode(
ctx,
conf,
pval,
nodeKey,
cf,

View File

@@ -5,110 +5,146 @@ import (
"context"
"errors"
"fmt"
"math"
"net"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"strings"
"time"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
bcv0 "github.com/tendermint/tendermint/internal/blocksync/v0"
bcv2 "github.com/tendermint/tendermint/internal/blocksync/v2"
cs "github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/blocksync"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/internal/mempool/v1"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/internal/p2p/pex"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/state/indexer/sink"
"github.com/tendermint/tendermint/internal/statesync"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmstrings "github.com/tendermint/tendermint/libs/strings"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/indexer/sink"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
)
func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config})
if err != nil {
return
}
blockStore = store.NewBlockStore(blockStoreDB)
type closer func() error
stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config})
return
func makeCloser(cs []closer) closer {
return func() error {
errs := make([]string, 0, len(cs))
for _, cl := range cs {
if err := cl(); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) >= 0 {
return errors.New(strings.Join(errs, "; "))
}
return nil
}
}
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
func convertCancelCloser(cancel context.CancelFunc) closer {
return func() error { cancel(); return nil }
}
func combineCloseError(err error, cl closer) error {
if err == nil {
return cl()
}
clerr := cl()
if clerr == nil {
return err
}
return fmt.Errorf("error=%q closerError=%q", err.Error(), clerr.Error())
}
func initDBs(
cfg *config.Config,
dbProvider config.DBProvider,
) (*store.BlockStore, dbm.DB, closer, error) {
blockStoreDB, err := dbProvider(&config.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return nil, nil, func() error { return nil }, err
}
closers := []closer{}
blockStore := store.NewBlockStore(blockStoreDB)
closers = append(closers, blockStoreDB.Close)
stateDB, err := dbProvider(&config.DBContext{ID: "state", Config: cfg})
if err != nil {
return nil, nil, makeCloser(closers), err
}
closers = append(closers, stateDB.Close)
return blockStore, stateDB, makeCloser(closers), nil
}
func createAndStartProxyAppConns(
ctx context.Context,
clientCreator abciclient.Creator,
logger log.Logger,
metrics *proxy.Metrics,
) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), metrics)
if err := proxyApp.Start(ctx); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
return proxyApp, nil
}
func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
func createAndStartEventBus(ctx context.Context, logger log.Logger) (*eventbus.EventBus, error) {
eventBus := eventbus.NewDefault(logger.With("module", "events"))
if err := eventBus.Start(ctx); err != nil {
return nil, err
}
return eventBus, nil
}
func createAndStartIndexerService(
config *cfg.Config,
dbProvider cfg.DBProvider,
eventBus *types.EventBus,
ctx context.Context,
cfg *config.Config,
dbProvider config.DBProvider,
eventBus *eventbus.EventBus,
logger log.Logger,
chainID string,
metrics *indexer.Metrics,
) (*indexer.Service, []indexer.EventSink, error) {
eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID)
eventSinks, err := sink.EventSinksFromConfig(cfg, dbProvider, chainID)
if err != nil {
return nil, nil, err
}
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
indexerService := indexer.NewService(indexer.ServiceArgs{
Sinks: eventSinks,
EventBus: eventBus,
Logger: logger.With("module", "txindex"),
Metrics: metrics,
})
if err := indexerService.Start(); err != nil {
if err := indexerService.Start(ctx); err != nil {
return nil, nil, err
}
return indexerService, eventSinks, nil
}
func doHandshake(
stateStore sm.Store,
state sm.State,
blockStore sm.BlockStore,
genDoc *types.GenesisDoc,
eventBus types.BlockEventPublisher,
proxyApp proxy.AppConns,
consensusLogger log.Logger) error {
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err)
}
return nil
}
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger, mode string) {
// Log the version info.
logger.Info("Version info",
"tmVersion", version.TMVersion,
@@ -124,17 +160,23 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
"state", state.Version.Consensus.Block,
)
}
switch {
case mode == cfg.ModeFull:
consensusLogger.Info("This node is a fullnode")
case mode == cfg.ModeValidator:
switch mode {
case config.ModeFull:
logger.Info("This node is a fullnode")
case config.ModeValidator:
addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
logger.Info("This node is a validator",
"addr", addr,
"pubKey", pubKey.Bytes(),
)
} else {
consensusLogger.Info("This node is a validator (NOT in the active validator set)",
"addr", addr, "pubKey", pubKey.Bytes())
logger.Info("This node is a validator (NOT in the active validator set)",
"addr", addr,
"pubKey", pubKey.Bytes(),
)
}
}
}
@@ -148,297 +190,215 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
}
func createMempoolReactor(
config *cfg.Config,
cfg *config.Config,
proxyApp proxy.AppConns,
state sm.State,
memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, service.Service, mempool.Mempool, error) {
) (service.Service, mempool.Mempool, error) {
logger = logger.With("module", "mempool", "version", config.Mempool.Version)
channelShims := mempoolv0.GetChannelShims(config.Mempool)
reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
logger = logger.With("module", "mempool")
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
ch, err := router.OpenChannel(mempool.GetChannelDescriptor(cfg.Mempool))
if err != nil {
return nil, nil, err
}
mp := mempool.NewTxMempool(
logger,
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempool.WithMetrics(memplMetrics),
mempool.WithPreCheck(sm.TxPreCheck(state)),
mempool.WithPostCheck(sm.TxPostCheck(state)),
)
if config.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, channelShims)
peerUpdates = peerManager.Subscribe()
reactor := mempool.NewReactor(
logger,
cfg.Mempool,
peerManager,
mp,
ch,
peerManager.Subscribe(),
)
if cfg.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
switch config.Mempool.Version {
case cfg.MempoolV0:
mp := mempoolv0.NewCListMempool(
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(memplMetrics),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
)
mp.SetLogger(logger)
reactor := mempoolv0.NewReactor(
logger,
config.Mempool,
peerManager,
mp,
channels[mempool.MempoolChannel],
peerUpdates,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
return reactorShim, reactor, mp, nil
case cfg.MempoolV1:
mp := mempoolv1.NewTxMempool(
logger,
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
reactor := mempoolv1.NewReactor(
logger,
config.Mempool,
peerManager,
mp,
channels[mempool.MempoolChannel],
peerUpdates,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
return reactorShim, reactor, mp, nil
default:
return nil, nil, nil, fmt.Errorf("unknown mempool version: %s", config.Mempool.Version)
}
return reactor, mp, nil
}
func createEvidenceReactor(
config *cfg.Config,
dbProvider cfg.DBProvider,
cfg *config.Config,
dbProvider config.DBProvider,
stateDB dbm.DB,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&cfg.DBContext{ID: "evidence", Config: config})
) (*evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&config.DBContext{ID: "evidence", Config: cfg})
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
logger = logger.With("module", "evidence")
reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
if err != nil {
return nil, nil, nil, fmt.Errorf("creating evidence pool: %w", err)
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
}
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, evidence.ChannelShims)
peerUpdates = peerManager.Subscribe()
ch, err := router.OpenChannel(evidence.GetChannelDescriptor())
if err != nil {
return nil, nil, fmt.Errorf("creating evidence channel: %w", err)
}
evidenceReactor := evidence.NewReactor(
logger,
channels[evidence.EvidenceChannel],
peerUpdates,
ch,
peerManager.Subscribe(),
evidencePool,
)
return reactorShim, evidenceReactor, evidencePool, nil
return evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(
logger log.Logger,
config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
csReactor *cs.Reactor,
csReactor *consensus.Reactor,
peerManager *p2p.PeerManager,
router *p2p.Router,
blockSync bool,
metrics *cs.Metrics,
) (*p2p.ReactorShim, service.Service, error) {
metrics *consensus.Metrics,
) (service.Service, error) {
logger = logger.With("module", "blockchain")
switch config.BlockSync.Version {
case cfg.BlockSyncV0:
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
channels[bcv0.BlockSyncChannel], peerUpdates, blockSync,
metrics,
)
if err != nil {
return nil, nil, err
}
return reactorShim, reactor, nil
case cfg.BlockSyncV2:
return nil, nil, errors.New("block sync version v2 is no longer supported. Please use v0")
default:
return nil, nil, fmt.Errorf("unknown block sync version %s", config.BlockSync.Version)
ch, err := router.OpenChannel(blocksync.GetChannelDescriptor())
if err != nil {
return nil, err
}
peerUpdates := peerManager.Subscribe()
reactor, err := blocksync.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
ch, peerUpdates, blockSync,
metrics,
)
if err != nil {
return nil, err
}
return reactor, nil
}
func createConsensusReactor(
config *cfg.Config,
cfg *config.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mp mempool.Mempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
csMetrics *consensus.Metrics,
waitSync bool,
eventBus *types.EventBus,
eventBus *eventbus.EventBus,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus")
consensusState := cs.NewState(
config.Consensus,
consensusState := consensus.NewState(
logger,
cfg.Consensus,
state.Copy(),
blockExec,
blockStore,
mp,
evidencePool,
cs.StateMetrics(csMetrics),
consensus.StateMetrics(csMetrics),
)
consensusState.SetLogger(logger)
if privValidator != nil && config.Mode == cfg.ModeValidator {
if privValidator != nil && cfg.Mode == config.ModeValidator {
consensusState.SetPrivValidator(privValidator)
}
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
csChDesc := consensus.GetChannelDescriptors()
channels := make(map[p2p.ChannelID]*p2p.Channel, len(csChDesc))
for idx := range csChDesc {
chd := csChDesc[idx]
ch, err := router.OpenChannel(chd)
if err != nil {
return nil, nil, err
}
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.UseLegacy {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, cs.ChannelShims)
peerUpdates = peerManager.Subscribe()
channels[ch.ID] = ch
}
reactor := cs.NewReactor(
reactor := consensus.NewReactor(
logger,
consensusState,
channels[cs.StateChannel],
channels[cs.DataChannel],
channels[cs.VoteChannel],
channels[cs.VoteSetBitsChannel],
peerUpdates,
channels[consensus.StateChannel],
channels[consensus.DataChannel],
channels[consensus.VoteChannel],
channels[consensus.VoteSetBitsChannel],
peerManager.Subscribe(),
waitSync,
cs.ReactorMetrics(csMetrics),
consensus.ReactorMetrics(csMetrics),
)
// Services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor.
reactor.SetEventBus(eventBus)
return reactorShim, reactor, consensusState
return reactor, consensusState, nil
}
func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport {
func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {
conf := conn.DefaultMConnConfig()
conf.FlushThrottle = cfg.P2P.FlushThrottleTimeout
conf.SendRate = cfg.P2P.SendRate
conf.RecvRate = cfg.P2P.RecvRate
conf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize
return p2p.NewMConnTransport(
logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
logger, conf, []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
len(tmstrings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
),
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
},
)
}
func createPeerManager(
config *cfg.Config,
dbProvider cfg.DBProvider,
p2pLogger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
nodeID types.NodeID,
) (*p2p.PeerManager, error) {
) (*p2p.PeerManager, closer, error) {
privatePeerIDs := make(map[types.NodeID]struct{})
for _, id := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PrivatePeerIDs, ",", " ") {
privatePeerIDs[types.NodeID(id)] = struct{}{}
}
var maxConns uint16
switch {
case config.P2P.MaxConnections > 0:
maxConns = config.P2P.MaxConnections
case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0:
x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers
if x > math.MaxUint16 {
return nil, fmt.Errorf(
"max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
config.P2P.MaxNumInboundPeers,
config.P2P.MaxNumOutboundPeers,
math.MaxUint16,
)
}
maxConns = uint16(x)
case cfg.P2P.MaxConnections > 0:
maxConns = cfg.P2P.MaxConnections
default:
maxConns = 64
}
privatePeerIDs := make(map[types.NodeID]struct{})
for _, id := range tmstrings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
privatePeerIDs[types.NodeID(id)] = struct{}{}
}
options := p2p.PeerManagerOptions{
MaxConnected: maxConns,
MaxConnectedUpgrade: 4,
@@ -451,218 +411,89 @@ func createPeerManager(
}
peers := []p2p.NodeAddress{}
for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
}
for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BootstrapPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
}
peerDB, err := dbProvider(&cfg.DBContext{ID: "peerstore", Config: config})
peerDB, err := dbProvider(&config.DBContext{ID: "peerstore", Config: cfg})
if err != nil {
return nil, err
return nil, func() error { return nil }, err
}
peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
return nil, peerDB.Close, fmt.Errorf("failed to create peer manager: %w", err)
}
for _, peer := range peers {
if _, err := peerManager.Add(peer); err != nil {
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
return nil, peerDB.Close, fmt.Errorf("failed to add peer %q: %w", peer, err)
}
}
return peerManager, nil
return peerManager, peerDB.Close, nil
}
func createRouter(
p2pLogger log.Logger,
logger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
nodeKey types.NodeKey,
peerManager *p2p.PeerManager,
transport p2p.Transport,
options p2p.RouterOptions,
conf *config.Config,
proxyApp proxy.AppConns,
) (*p2p.Router, error) {
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, conf)
ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(conf.P2P.ListenAddress))
if err != nil {
return nil, err
}
return p2p.NewRouter(
p2pLogger,
p2pMetrics,
nodeInfo,
privKey,
nodeKey.PrivKey,
peerManager,
[]p2p.Transport{transport},
options,
[]p2p.Endpoint{ep},
getRouterConfig(conf, proxyApp),
)
}
func createSwitch(
config *cfg.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
mempoolReactor *p2p.ReactorShim,
bcReactor p2p.Reactor,
stateSyncReactor *p2p.ReactorShim,
consensusReactor *p2p.ReactorShim,
evidenceReactor *p2p.ReactorShim,
proxyApp proxy.AppConns,
nodeInfo types.NodeInfo,
nodeKey types.NodeKey,
p2pLogger log.Logger,
) *p2p.Switch {
var (
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)
if !config.P2P.AllowDuplicateIP {
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
}
// Filter peers by addr or pubkey with an ABCI query.
// If the query return code is OK, add peer.
if config.FilterPeers {
connFilters = append(
connFilters,
// ABCI query for address filtering.
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
},
)
peerFilters = append(
peerFilters,
// ABCI query for ID filtering.
func(_ p2p.IPeerSet, p p2p.Peer) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
},
)
}
sw := p2p.NewSwitch(
config.P2P,
transport,
p2p.WithMetrics(p2pMetrics),
p2p.SwitchPeerFilters(peerFilters...),
p2p.SwitchConnFilters(connFilters...),
)
sw.SetLogger(p2pLogger)
if config.Mode != cfg.ModeSeed {
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
}
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
return sw
}
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
p2pLogger log.Logger, nodeKey types.NodeKey) (pex.AddrBook, error) {
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
// Add ourselves to addrbook to prevent dialing ourselves
if config.P2P.ExternalAddress != "" {
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ExternalAddress))
if err != nil {
return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}
if config.P2P.ListenAddress != "" {
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ListenAddress))
if err != nil {
return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}
sw.SetAddrBook(addrBook)
return addrBook, nil
}
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
sw *p2p.Switch, logger log.Logger) *pex.Reactor {
reactorConfig := &pex.ReactorConfig{
Seeds: tmstrings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.Mode == cfg.ModeSeed,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
}
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewReactor(addrBook, reactorConfig)
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
return pexReactor
}
func createPEXReactorV2(
config *cfg.Config,
func createPEXReactor(
logger log.Logger,
peerManager *p2p.PeerManager,
router *p2p.Router,
) (service.Service, error) {
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
channel, err := router.OpenChannel(pex.ChannelDescriptor())
if err != nil {
return nil, err
}
peerUpdates := peerManager.Subscribe()
return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil
return pex.NewReactor(logger, peerManager, channel, peerManager.Subscribe()), nil
}
func makeNodeInfo(
config *cfg.Config,
cfg *config.Config,
nodeKey types.NodeKey,
eventSinks []indexer.EventSink,
genDoc *types.GenesisDoc,
@@ -674,17 +505,7 @@ func makeNodeInfo(
txIndexerStatus = "on"
}
var bcChannel byte
switch config.BlockSync.Version {
case cfg.BlockSyncV0:
bcChannel = byte(bcv0.BlockSyncChannel)
case cfg.BlockSyncV2:
bcChannel = bcv2.BlockchainChannel
default:
return types.NodeInfo{}, fmt.Errorf("unknown blocksync version %s", config.BlockSync.Version)
}
bcChannel := byte(blocksync.BlockSyncChannel)
nodeInfo := types.NodeInfo{
ProtocolVersion: types.ProtocolVersion{
@@ -697,10 +518,10 @@ func makeNodeInfo(
Version: version.TMVersion,
Channels: []byte{
bcChannel,
byte(cs.StateChannel),
byte(cs.DataChannel),
byte(cs.VoteChannel),
byte(cs.VoteSetBitsChannel),
byte(consensus.StateChannel),
byte(consensus.DataChannel),
byte(consensus.VoteChannel),
byte(consensus.VoteSetBitsChannel),
byte(mempool.MempoolChannel),
byte(evidence.EvidenceChannel),
byte(statesync.SnapshotChannel),
@@ -708,21 +529,21 @@ func makeNodeInfo(
byte(statesync.LightBlockChannel),
byte(statesync.ParamsChannel),
},
Moniker: config.Moniker,
Moniker: cfg.Moniker,
Other: types.NodeInfoOther{
TxIndex: txIndexerStatus,
RPCAddress: config.RPC.ListenAddress,
RPCAddress: cfg.RPC.ListenAddress,
},
}
if config.P2P.PexReactor {
if cfg.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
lAddr := config.P2P.ExternalAddress
lAddr := cfg.P2P.ExternalAddress
if lAddr == "" {
lAddr = config.P2P.ListenAddress
lAddr = cfg.P2P.ListenAddress
}
nodeInfo.ListenAddr = lAddr
@@ -732,7 +553,7 @@ func makeNodeInfo(
}
func makeSeedNodeInfo(
config *cfg.Config,
cfg *config.Config,
nodeKey types.NodeKey,
genDoc *types.GenesisDoc,
state sm.State,
@@ -747,21 +568,21 @@ func makeSeedNodeInfo(
Network: genDoc.ChainID,
Version: version.TMVersion,
Channels: []byte{},
Moniker: config.Moniker,
Moniker: cfg.Moniker,
Other: types.NodeInfoOther{
TxIndex: "off",
RPCAddress: config.RPC.ListenAddress,
RPCAddress: cfg.RPC.ListenAddress,
},
}
if config.P2P.PexReactor {
if cfg.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
lAddr := config.P2P.ExternalAddress
lAddr := cfg.P2P.ExternalAddress
if lAddr == "" {
lAddr = config.P2P.ListenAddress
lAddr = cfg.P2P.ListenAddress
}
nodeInfo.ListenAddr = lAddr