Merge branch 'master' into callum/handshake

This commit is contained in:
Callum Waters
2021-10-18 10:25:00 +02:00
491 changed files with 15088 additions and 27540 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -13,50 +13,55 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/tmhash"
consmocks "github.com/tendermint/tendermint/internal/consensus/mocks"
ssmocks "github.com/tendermint/tendermint/internal/statesync/mocks"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
statesync "github.com/tendermint/tendermint/internal/statesync"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/libs/service"
tmtime "github.com/tendermint/tendermint/libs/time"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
func TestNodeStartStop(t *testing.T) {
config := cfg.ResetTestRoot("node_node_test")
defer os.RemoveAll(config.RootDir)
cfg := config.ResetTestRoot("node_node_test")
defer os.RemoveAll(cfg.RootDir)
// create & start node
ns, err := newDefaultNode(config, log.TestingLogger())
ns, err := newDefaultNode(cfg, log.TestingLogger())
require.NoError(t, err)
require.NoError(t, ns.Start())
t.Cleanup(func() {
if ns.IsRunning() {
assert.NoError(t, ns.Stop())
ns.Wait()
}
})
n, ok := ns.(*nodeImpl)
require.True(t, ok)
t.Logf("Started node %v", n.sw.NodeInfo())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// wait for the node to produce a block
blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
blocksSub, err := n.EventBus().Subscribe(ctx, "node_test", types.EventQueryNewBlock)
require.NoError(t, err)
select {
case <-blocksSub.Out():
@@ -86,38 +91,45 @@ func TestNodeStartStop(t *testing.T) {
}
}
func getTestNode(t *testing.T, conf *cfg.Config, logger log.Logger) *nodeImpl {
func getTestNode(t *testing.T, conf *config.Config, logger log.Logger) *nodeImpl {
t.Helper()
ns, err := newDefaultNode(conf, logger)
require.NoError(t, err)
n, ok := ns.(*nodeImpl)
require.True(t, ok)
t.Cleanup(func() {
if ns.IsRunning() {
assert.NoError(t, ns.Stop())
ns.Wait()
}
})
return n
}
func TestNodeDelayedStart(t *testing.T) {
config := cfg.ResetTestRoot("node_delayed_start_test")
defer os.RemoveAll(config.RootDir)
cfg := config.ResetTestRoot("node_delayed_start_test")
defer os.RemoveAll(cfg.RootDir)
now := tmtime.Now()
// create & start node
n := getTestNode(t, config, log.TestingLogger())
n := getTestNode(t, cfg, log.TestingLogger())
n.GenesisDoc().GenesisTime = now.Add(2 * time.Second)
require.NoError(t, n.Start())
defer n.Stop() //nolint:errcheck // ignore for tests
startTime := tmtime.Now()
assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime))
}
func TestNodeSetAppVersion(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)
cfg := config.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(cfg.RootDir)
// create node
n := getTestNode(t, config, log.TestingLogger())
n := getTestNode(t, cfg, log.TestingLogger())
// default config uses the kvstore app
var appVersion uint64 = kvstore.ProtocolVersion
@@ -134,9 +146,9 @@ func TestNodeSetAppVersion(t *testing.T) {
func TestNodeSetPrivValTCP(t *testing.T) {
addr := "tcp://" + testFreeAddr(t)
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
defer os.RemoveAll(config.RootDir)
config.PrivValidator.ListenAddr = addr
cfg := config.ResetTestRoot("node_priv_val_tcp_test")
defer os.RemoveAll(cfg.RootDir)
cfg.PrivValidator.ListenAddr = addr
dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey())
dialerEndpoint := privval.NewSignerDialerEndpoint(
@@ -147,7 +159,7 @@ func TestNodeSetPrivValTCP(t *testing.T) {
signerServer := privval.NewSignerServer(
dialerEndpoint,
config.ChainID(),
cfg.ChainID(),
types.NewMockPV(),
)
@@ -159,7 +171,7 @@ func TestNodeSetPrivValTCP(t *testing.T) {
}()
defer signerServer.Stop() //nolint:errcheck // ignore for tests
n := getTestNode(t, config, log.TestingLogger())
n := getTestNode(t, cfg, log.TestingLogger())
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
}
@@ -167,21 +179,26 @@ func TestNodeSetPrivValTCP(t *testing.T) {
func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
addrNoPrefix := testFreeAddr(t)
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
defer os.RemoveAll(config.RootDir)
config.PrivValidator.ListenAddr = addrNoPrefix
cfg := config.ResetTestRoot("node_priv_val_tcp_test")
defer os.RemoveAll(cfg.RootDir)
cfg.PrivValidator.ListenAddr = addrNoPrefix
_, err := newDefaultNode(config, log.TestingLogger())
n, err := newDefaultNode(cfg, log.TestingLogger())
assert.Error(t, err)
if n != nil && n.IsRunning() {
assert.NoError(t, n.Stop())
n.Wait()
}
}
func TestNodeSetPrivValIPC(t *testing.T) {
tmpfile := "/tmp/kms." + tmrand.Str(6) + ".sock"
defer os.Remove(tmpfile) // clean up
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
defer os.RemoveAll(config.RootDir)
config.PrivValidator.ListenAddr = "unix://" + tmpfile
cfg := config.ResetTestRoot("node_priv_val_tcp_test")
defer os.RemoveAll(cfg.RootDir)
cfg.PrivValidator.ListenAddr = "unix://" + tmpfile
dialer := privval.DialUnixFn(tmpfile)
dialerEndpoint := privval.NewSignerDialerEndpoint(
@@ -192,7 +209,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {
pvsc := privval.NewSignerServer(
dialerEndpoint,
config.ChainID(),
cfg.ChainID(),
types.NewMockPV(),
)
@@ -201,7 +218,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {
require.NoError(t, err)
}()
defer pvsc.Stop() //nolint:errcheck // ignore for tests
n := getTestNode(t, config, log.TestingLogger())
n := getTestNode(t, cfg, log.TestingLogger())
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
}
@@ -217,10 +234,13 @@ func testFreeAddr(t *testing.T) string {
// create a proposal block using real and full
// mempool and evidence pool and validate it.
func TestCreateProposalBlock(t *testing.T) {
config := cfg.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(config.RootDir)
cc := proxy.NewLocalClientCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg := config.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
@@ -228,7 +248,7 @@ func TestCreateProposalBlock(t *testing.T) {
logger := log.TestingLogger()
const height int64 = 1
state, stateDB, privVals := state(1, height)
state, stateDB, privVals := state(t, 1, height)
stateStore := sm.NewStore(stateDB)
maxBytes := 16384
const partSize uint32 = 256
@@ -238,7 +258,7 @@ func TestCreateProposalBlock(t *testing.T) {
proposerAddr, _ := state.Validators.GetByIndex(0)
mp := mempoolv0.NewCListMempool(
config.Mempool,
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(mempool.NopMetrics()),
@@ -255,7 +275,7 @@ func TestCreateProposalBlock(t *testing.T) {
// fill the evidence pool with more evidence
// than can fit in a block
var currentBytes int64 = 0
var currentBytes int64
for currentBytes <= maxEvidenceBytes {
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, time.Now(), privVals[0], "test-chain")
currentBytes += int64(len(ev.Bytes()))
@@ -272,7 +292,7 @@ func TestCreateProposalBlock(t *testing.T) {
txLength := 100
for i := 0; i <= maxBytes/txLength; i++ {
tx := tmrand.Bytes(txLength)
err := mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err := mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
}
@@ -309,10 +329,13 @@ func TestCreateProposalBlock(t *testing.T) {
}
func TestMaxTxsProposalBlockSize(t *testing.T) {
config := cfg.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(config.RootDir)
cc := proxy.NewLocalClientCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg := config.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
@@ -320,7 +343,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
logger := log.TestingLogger()
const height int64 = 1
state, stateDB, _ := state(1, height)
state, stateDB, _ := state(t, 1, height)
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
const maxBytes int64 = 16384
@@ -330,7 +353,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
// Make Mempool
mp := mempoolv0.NewCListMempool(
config.Mempool,
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(mempool.NopMetrics()),
@@ -342,7 +365,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
// fill the mempool with one txs just below the maximum size
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1))
tx := tmrand.Bytes(txLength - 4) // to account for the varint
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
blockExec := sm.NewBlockExecutor(
@@ -371,17 +394,20 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
}
func TestMaxProposalBlockSize(t *testing.T) {
config := cfg.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(config.RootDir)
cc := proxy.NewLocalClientCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg := config.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop() //nolint:errcheck // ignore for tests
logger := log.TestingLogger()
state, stateDB, _ := state(types.MaxVotesCount, int64(1))
state, stateDB, _ := state(t, types.MaxVotesCount, int64(1))
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
const maxBytes int64 = 1024 * 1024 * 2
@@ -390,7 +416,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
// Make Mempool
mp := mempoolv0.NewCListMempool(
config.Mempool,
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(mempool.NopMetrics()),
@@ -408,7 +434,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
// At the end of the test, only the single big tx should be added
for i := 0; i < 10; i++ {
tx := tmrand.Bytes(10)
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
}
@@ -486,115 +512,153 @@ func TestMaxProposalBlockSize(t *testing.T) {
}
func TestNodeNewSeedNode(t *testing.T) {
config := cfg.ResetTestRoot("node_new_node_custom_reactors_test")
config.Mode = cfg.ModeSeed
defer os.RemoveAll(config.RootDir)
cfg := config.ResetTestRoot("node_new_node_custom_reactors_test")
cfg.Mode = config.ModeSeed
defer os.RemoveAll(cfg.RootDir)
nodeKey, err := types.LoadOrGenNodeKey(config.NodeKeyFile())
nodeKey, err := types.LoadOrGenNodeKey(cfg.NodeKeyFile())
require.NoError(t, err)
ns, err := makeSeedNode(config,
cfg.DefaultDBProvider,
ns, err := makeSeedNode(cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(config),
defaultGenesisDocProviderFunc(cfg),
log.TestingLogger(),
)
require.NoError(t, err)
n, ok := ns.(*nodeImpl)
require.True(t, ok)
err = n.Start()
require.NoError(t, err)
assert.True(t, n.pexReactor.IsRunning())
require.NoError(t, n.Stop())
}
func TestNodeSetEventSink(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)
cfg := config.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(cfg.RootDir)
n := getTestNode(t, config, log.TestingLogger())
logger := log.TestingLogger()
setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink {
eventBus, err := createAndStartEventBus(logger)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, eventBus.Stop()) })
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.KV, n.eventSinks[0].Type())
indexService, eventSinks, err := createAndStartIndexerService(cfg,
config.DefaultDBProvider, eventBus, logger, genDoc.ChainID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
return eventSinks
}
cleanup := func(ns service.Service) func() {
return func() {
n, ok := ns.(*nodeImpl)
if !ok {
return
}
if n == nil {
return
}
if !n.IsRunning() {
return
}
assert.NoError(t, n.Stop())
n.Wait()
}
}
config.TxIndex.Indexer = []string{"null"}
n = getTestNode(t, config, log.TestingLogger())
eventSinks := setupTest(t, cfg)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.KV, eventSinks[0].Type())
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
cfg.TxIndex.Indexer = []string{"null"}
eventSinks = setupTest(t, cfg)
config.TxIndex.Indexer = []string{"null", "kv"}
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
cfg.TxIndex.Indexer = []string{"null", "kv"}
eventSinks = setupTest(t, cfg)
config.TxIndex.Indexer = []string{"kvv"}
ns, err := newDefaultNode(config, log.TestingLogger())
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
cfg.TxIndex.Indexer = []string{"kvv"}
ns, err := newDefaultNode(cfg, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("unsupported event sink type"), err)
assert.Contains(t, err.Error(), "unsupported event sink type")
t.Cleanup(cleanup(ns))
config.TxIndex.Indexer = []string{}
n = getTestNode(t, config, log.TestingLogger())
cfg.TxIndex.Indexer = []string{}
eventSinks = setupTest(t, cfg)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"psql"}
ns, err = newDefaultNode(config, log.TestingLogger())
cfg.TxIndex.Indexer = []string{"psql"}
ns, err = newDefaultNode(cfg, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
assert.Contains(t, err.Error(), "the psql connection settings cannot be empty")
t.Cleanup(cleanup(ns))
var psqlConn = "test"
config.TxIndex.Indexer = []string{"psql"}
config.TxIndex.PsqlConn = psqlConn
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
n.OnStop()
cfg.TxIndex.Indexer = []string{"psql"}
cfg.TxIndex.PsqlConn = psqlConn
eventSinks = setupTest(t, cfg)
config.TxIndex.Indexer = []string{"psql", "kv"}
config.TxIndex.PsqlConn = psqlConn
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 2, len(n.eventSinks))
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
cfg.TxIndex.Indexer = []string{"psql", "kv"}
cfg.TxIndex.PsqlConn = psqlConn
eventSinks = setupTest(t, cfg)
assert.Equal(t, 2, len(eventSinks))
// we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks.
if n.eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
if eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, eventSinks[1].Type())
} else {
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
assert.Equal(t, indexer.KV, eventSinks[1].Type())
}
n.OnStop()
config.TxIndex.Indexer = []string{"kv", "psql"}
config.TxIndex.PsqlConn = psqlConn
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 2, len(n.eventSinks))
if n.eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
cfg.TxIndex.Indexer = []string{"kv", "psql"}
cfg.TxIndex.PsqlConn = psqlConn
eventSinks = setupTest(t, cfg)
assert.Equal(t, 2, len(eventSinks))
if eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, eventSinks[1].Type())
} else {
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
assert.Equal(t, indexer.KV, eventSinks[1].Type())
}
n.OnStop()
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
config.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
config.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(config, log.TestingLogger())
cfg.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
cfg.TxIndex.PsqlConn = psqlConn
ns, err = newDefaultNode(cfg, logger)
require.Error(t, err)
assert.Equal(t, e, err)
assert.Contains(t, err.Error(), e.Error())
t.Cleanup(cleanup(ns))
config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
config.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(config, log.TestingLogger())
cfg.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
cfg.TxIndex.PsqlConn = psqlConn
ns, err = newDefaultNode(cfg, logger)
require.Error(t, err)
assert.Equal(t, e, err)
assert.Contains(t, err.Error(), e.Error())
t.Cleanup(cleanup(ns))
}
func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
func state(t *testing.T, nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
t.Helper()
privVals := make([]types.PrivValidator, nVals)
vals := make([]types.GenesisValidator, nVals)
for i := 0; i < nVals; i++ {
@@ -615,17 +679,15 @@ func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
// save validators to db for 2 heights
stateDB := dbm.NewMemDB()
t.Cleanup(func() { require.NoError(t, stateDB.Close()) })
stateStore := sm.NewStore(stateDB)
if err := stateStore.Save(s); err != nil {
panic(err)
}
require.NoError(t, stateStore.Save(s))
for i := 1; i < int(height); i++ {
s.LastBlockHeight++
s.LastValidators = s.Validators.Copy()
if err := stateStore.Save(s); err != nil {
panic(err)
}
require.NoError(t, stateStore.Save(s))
}
return s, stateDB, privVals
}
@@ -639,13 +701,13 @@ func loadStatefromGenesis(t *testing.T) sm.State {
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
config := cfg.ResetTestRoot("load_state_from_genesis")
cfg := config.ResetTestRoot("load_state_from_genesis")
loadedState, err := stateStore.Load()
require.NoError(t, err)
require.True(t, loadedState.IsEmpty())
genDoc, _ := factory.RandGenesisDoc(config, 0, false, 10)
genDoc, _ := factory.RandGenesisDoc(cfg, 0, false, 10)
state, err := loadStateFromDBOrGenesisDocProvider(
stateStore,
@@ -656,65 +718,3 @@ func loadStatefromGenesis(t *testing.T) sm.State {
return state
}
func TestNodeStartStateSync(t *testing.T) {
mockSSR := &statesync.MockSyncReactor{}
mockFSR := &consmocks.BlockSyncReactor{}
mockCSR := &consmocks.ConsSyncReactor{}
mockSP := &ssmocks.StateProvider{}
state := loadStatefromGenesis(t)
config := cfg.ResetTestRoot("load_state_from_genesis")
eventBus, err := createAndStartEventBus(log.TestingLogger())
defer func() {
err := eventBus.Stop()
require.NoError(t, err)
}()
require.NoError(t, err)
require.NotNil(t, eventBus)
sub, err := eventBus.Subscribe(context.Background(), "test-client", types.EventQueryStateSyncStatus, 10)
require.NoError(t, err)
require.NotNil(t, sub)
cfgSS := config.StateSync
mockSSR.On("Sync", context.TODO(), mockSP, cfgSS.DiscoveryTime).Return(state, nil).
On("Backfill", state).Return(nil)
mockCSR.On("SetStateSyncingMetrics", float64(0)).Return().
On("SwitchToConsensus", state, true).Return()
require.NoError(t,
startStateSync(mockSSR, mockFSR, mockCSR, mockSP, config.StateSync, false, state.InitialHeight, eventBus))
for cnt := 0; cnt < 2; {
select {
case <-time.After(3 * time.Second):
t.Errorf("StateSyncStatus timeout")
case msg := <-sub.Out():
if cnt == 0 {
ensureStateSyncStatus(t, msg, false, state.InitialHeight)
cnt++
} else {
// the state height = 0 because we are not actually update the state in this test
ensureStateSyncStatus(t, msg, true, 0)
cnt++
}
}
}
mockSSR.AssertNumberOfCalls(t, "Sync", 1)
mockSSR.AssertNumberOfCalls(t, "Backfill", 1)
mockCSR.AssertNumberOfCalls(t, "SetStateSyncingMetrics", 1)
mockCSR.AssertNumberOfCalls(t, "SwitchToConsensus", 1)
}
func ensureStateSyncStatus(t *testing.T, msg tmpubsub.Message, complete bool, height int64) {
t.Helper()
status, ok := msg.Data().(types.EventDataStateSyncStatus)
require.True(t, ok)
require.Equal(t, complete, status.Complete)
require.Equal(t, height, status.Height)
}

View File

@@ -4,11 +4,11 @@ package node
import (
"fmt"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@@ -28,7 +28,7 @@ func NewDefault(conf *config.Config, logger log.Logger) (service.Service, error)
// value of the final argument.
func New(conf *config.Config,
logger log.Logger,
cf proxy.ClientCreator,
cf abciclient.Creator,
gen *types.GenesisDoc,
) (service.Service, error) {
nodeKey, err := types.LoadOrGenNodeKey(conf.NodeKeyFile())

View File

@@ -2,59 +2,96 @@ package node
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"net"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"strings"
"time"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
bcv0 "github.com/tendermint/tendermint/internal/blocksync/v0"
bcv2 "github.com/tendermint/tendermint/internal/blocksync/v2"
cs "github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/blocksync"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/internal/mempool/v1"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/internal/p2p/pex"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/state/indexer/sink"
"github.com/tendermint/tendermint/internal/statesync"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmstrings "github.com/tendermint/tendermint/libs/strings"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
null "github.com/tendermint/tendermint/state/indexer/sink/null"
psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
)
func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config})
if err != nil {
return
}
blockStore = store.NewBlockStore(blockStoreDB)
type closer func() error
stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config})
return
func makeCloser(cs []closer) closer {
return func() error {
errs := make([]string, 0, len(cs))
for _, cl := range cs {
if err := cl(); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) >= 0 {
return errors.New(strings.Join(errs, "; "))
}
return nil
}
}
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator)
func combineCloseError(err error, cl closer) error {
if err == nil {
return cl()
}
clerr := cl()
if clerr == nil {
return err
}
return fmt.Errorf("error=%q closerError=%q", err.Error(), clerr.Error())
}
func initDBs(
cfg *config.Config,
dbProvider config.DBProvider,
) (*store.BlockStore, dbm.DB, closer, error) {
blockStoreDB, err := dbProvider(&config.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return nil, nil, func() error { return nil }, err
}
closers := []closer{}
blockStore := store.NewBlockStore(blockStoreDB)
closers = append(closers, blockStoreDB.Close)
stateDB, err := dbProvider(&config.DBContext{ID: "state", Config: cfg})
if err != nil {
return nil, nil, makeCloser(closers), err
}
closers = append(closers, stateDB.Close)
return blockStore, stateDB, makeCloser(closers), nil
}
// nolint:lll
func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator, metrics)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
@@ -72,62 +109,15 @@ func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
}
func createAndStartIndexerService(
config *cfg.Config,
dbProvider cfg.DBProvider,
cfg *config.Config,
dbProvider config.DBProvider,
eventBus *types.EventBus,
logger log.Logger,
chainID string,
) (*indexer.Service, []indexer.EventSink, error) {
eventSinks := []indexer.EventSink{}
// check for duplicated sinks
sinks := map[string]bool{}
for _, s := range config.TxIndex.Indexer {
sl := strings.ToLower(s)
if sinks[sl] {
return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
}
sinks[sl] = true
}
loop:
for k := range sinks {
switch k {
case string(indexer.NULL):
// When we see null in the config, the eventsinks will be reset with the
// nullEventSink.
eventSinks = []indexer.EventSink{null.NewEventSink()}
break loop
case string(indexer.KV):
store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
if err != nil {
return nil, nil, err
}
eventSinks = append(eventSinks, kv.NewEventSink(store))
case string(indexer.PSQL):
conn := config.TxIndex.PsqlConn
if conn == "" {
return nil, nil, errors.New("the psql connection settings cannot be empty")
}
es, _, err := psql.NewEventSink(conn, chainID)
if err != nil {
return nil, nil, err
}
eventSinks = append(eventSinks, es)
default:
return nil, nil, errors.New("unsupported event sink type")
}
}
if len(eventSinks) == 0 {
eventSinks = []indexer.EventSink{null.NewEventSink()}
eventSinks, err := sink.EventSinksFromConfig(cfg, dbProvider, chainID)
if err != nil {
return nil, nil, err
}
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
@@ -157,9 +147,9 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger,
)
}
switch {
case mode == cfg.ModeFull:
case mode == config.ModeFull:
logger.Info("This node is a fullnode")
case mode == cfg.ModeValidator:
case mode == config.ModeValidator:
addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
@@ -180,36 +170,27 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
}
func createMempoolReactor(
config *cfg.Config,
cfg *config.Config,
proxyApp proxy.AppConns,
state sm.State,
memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, service.Service, mempool.Mempool, error) {
) (service.Service, mempool.Mempool, error) {
logger = logger.With("module", "mempool", "version", config.Mempool.Version)
channelShims := mempoolv0.GetChannelShims(config.Mempool)
reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
logger = logger.With("module", "mempool", "version", cfg.Mempool.Version)
peerUpdates := peerManager.Subscribe()
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
switch cfg.Mempool.Version {
case config.MempoolV0:
ch, err := router.OpenChannel(mempoolv0.GetChannelDescriptor(cfg.Mempool))
if err != nil {
return nil, nil, err
}
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, channelShims)
peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
}
switch config.Mempool.Version {
case cfg.MempoolV0:
mp := mempoolv0.NewCListMempool(
config.Mempool,
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(memplMetrics),
@@ -221,23 +202,28 @@ func createMempoolReactor(
reactor := mempoolv0.NewReactor(
logger,
config.Mempool,
cfg.Mempool,
peerManager,
mp,
channels[mempool.MempoolChannel],
ch,
peerUpdates,
)
if config.Consensus.WaitForTxs() {
if cfg.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
return reactorShim, reactor, mp, nil
return reactor, mp, nil
case config.MempoolV1:
ch, err := router.OpenChannel(mempoolv1.GetChannelDescriptor(cfg.Mempool))
if err != nil {
return nil, nil, err
}
case cfg.MempoolV1:
mp := mempoolv1.NewTxMempool(
logger,
config.Mempool,
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv1.WithMetrics(memplMetrics),
@@ -247,227 +233,182 @@ func createMempoolReactor(
reactor := mempoolv1.NewReactor(
logger,
config.Mempool,
cfg.Mempool,
peerManager,
mp,
channels[mempool.MempoolChannel],
ch,
peerUpdates,
)
if config.Consensus.WaitForTxs() {
if cfg.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
return reactorShim, reactor, mp, nil
return reactor, mp, nil
default:
return nil, nil, nil, fmt.Errorf("unknown mempool version: %s", config.Mempool.Version)
return nil, nil, fmt.Errorf("unknown mempool version: %s", cfg.Mempool.Version)
}
}
func createEvidenceReactor(
config *cfg.Config,
dbProvider cfg.DBProvider,
cfg *config.Config,
dbProvider config.DBProvider,
stateDB dbm.DB,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&cfg.DBContext{ID: "evidence", Config: config})
) (*evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&config.DBContext{ID: "evidence", Config: cfg})
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
logger = logger.With("module", "evidence")
reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
if err != nil {
return nil, nil, nil, fmt.Errorf("creating evidence pool: %w", err)
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
}
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, evidence.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
ch, err := router.OpenChannel(evidence.GetChannelDescriptor())
if err != nil {
return nil, nil, fmt.Errorf("creating evidence channel: %w", err)
}
evidenceReactor := evidence.NewReactor(
logger,
channels[evidence.EvidenceChannel],
peerUpdates,
ch,
peerManager.Subscribe(),
evidencePool,
)
return reactorShim, evidenceReactor, evidencePool, nil
return evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(
logger log.Logger,
config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
csReactor *cs.Reactor,
csReactor *consensus.Reactor,
peerManager *p2p.PeerManager,
router *p2p.Router,
blockSync bool,
metrics *cs.Metrics,
) (*p2p.ReactorShim, service.Service, error) {
metrics *consensus.Metrics,
) (service.Service, error) {
logger = logger.With("module", "blockchain")
switch config.BlockSync.Version {
case cfg.BlockSyncV0:
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
}
reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
channels[bcv0.BlockchainChannel], peerUpdates, blockSync,
metrics,
)
if err != nil {
return nil, nil, err
}
return reactorShim, reactor, nil
case cfg.BlockSyncV2:
return nil, nil, errors.New("block sync version v2 is no longer supported. Please use v0")
default:
return nil, nil, fmt.Errorf("unknown block sync version %s", config.BlockSync.Version)
ch, err := router.OpenChannel(blocksync.GetChannelDescriptor())
if err != nil {
return nil, err
}
peerUpdates := peerManager.Subscribe()
reactor, err := blocksync.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
ch, peerUpdates, blockSync,
metrics,
)
if err != nil {
return nil, err
}
return reactor, nil
}
func createConsensusReactor(
config *cfg.Config,
cfg *config.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mp mempool.Mempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
csMetrics *consensus.Metrics,
waitSync bool,
eventBus *types.EventBus,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
) (*consensus.Reactor, *consensus.State, error) {
consensusState := cs.NewState(
config.Consensus,
consensusState := consensus.NewState(
cfg.Consensus,
state.Copy(),
blockExec,
blockStore,
mp,
evidencePool,
cs.StateMetrics(csMetrics),
consensus.StateMetrics(csMetrics),
)
consensusState.SetLogger(logger)
if privValidator != nil && config.Mode == cfg.ModeValidator {
if privValidator != nil && cfg.Mode == config.ModeValidator {
consensusState.SetPrivValidator(privValidator)
}
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
csChDesc := consensus.GetChannelDescriptors()
channels := make(map[p2p.ChannelID]*p2p.Channel, len(csChDesc))
for idx := range csChDesc {
chd := csChDesc[idx]
ch, err := router.OpenChannel(chd)
if err != nil {
return nil, nil, err
}
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, cs.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
channels[ch.ID] = ch
}
reactor := cs.NewReactor(
peerUpdates := peerManager.Subscribe()
reactor := consensus.NewReactor(
logger,
consensusState,
channels[cs.StateChannel],
channels[cs.DataChannel],
channels[cs.VoteChannel],
channels[cs.VoteSetBitsChannel],
channels[consensus.StateChannel],
channels[consensus.DataChannel],
channels[consensus.VoteChannel],
channels[consensus.VoteSetBitsChannel],
peerUpdates,
waitSync,
cs.ReactorMetrics(csMetrics),
consensus.ReactorMetrics(csMetrics),
)
// Services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor.
reactor.SetEventBus(eventBus)
return reactorShim, reactor, consensusState
return reactor, consensusState, nil
}
func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport {
func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {
return p2p.NewMConnTransport(
logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
logger, conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
len(tmstrings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
),
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
},
)
}
func createPeerManager(
config *cfg.Config,
dbProvider cfg.DBProvider,
p2pLogger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
nodeID types.NodeID,
) (*p2p.PeerManager, error) {
) (*p2p.PeerManager, closer, error) {
var maxConns uint16
switch {
case config.P2P.MaxConnections > 0:
maxConns = config.P2P.MaxConnections
case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0:
x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers
if x > math.MaxUint16 {
return nil, fmt.Errorf(
"max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
config.P2P.MaxNumInboundPeers,
config.P2P.MaxNumOutboundPeers,
math.MaxUint16,
)
}
maxConns = uint16(x)
case cfg.P2P.MaxConnections > 0:
maxConns = cfg.P2P.MaxConnections
default:
maxConns = 64
}
privatePeerIDs := make(map[types.NodeID]struct{})
for _, id := range tmstrings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
for _, id := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PrivatePeerIDs, ",", " ") {
privatePeerIDs[types.NodeID(id)] = struct{}{}
}
@@ -483,41 +424,41 @@ func createPeerManager(
}
peers := []p2p.NodeAddress{}
for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
}
for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BootstrapPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
}
peerDB, err := dbProvider(&cfg.DBContext{ID: "peerstore", Config: config})
peerDB, err := dbProvider(&config.DBContext{ID: "peerstore", Config: cfg})
if err != nil {
return nil, err
return nil, func() error { return nil }, err
}
peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
return nil, peerDB.Close, fmt.Errorf("failed to create peer manager: %w", err)
}
for _, peer := range peers {
if _, err := peerManager.Add(peer); err != nil {
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
return nil, peerDB.Close, fmt.Errorf("failed to add peer %q: %w", peer, err)
}
}
return peerManager, nil
return peerManager, peerDB.Close, nil
}
func createRouter(
@@ -541,160 +482,23 @@ func createRouter(
)
}
func createSwitch(
config *cfg.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
mempoolReactor *p2p.ReactorShim,
bcReactor p2p.Reactor,
stateSyncReactor *p2p.ReactorShim,
consensusReactor *p2p.ReactorShim,
evidenceReactor *p2p.ReactorShim,
proxyApp proxy.AppConns,
nodeInfo types.NodeInfo,
nodeKey types.NodeKey,
p2pLogger log.Logger,
) *p2p.Switch {
var (
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)
if !config.P2P.AllowDuplicateIP {
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
}
// Filter peers by addr or pubkey with an ABCI query.
// If the query return code is OK, add peer.
if config.FilterPeers {
connFilters = append(
connFilters,
// ABCI query for address filtering.
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
},
)
peerFilters = append(
peerFilters,
// ABCI query for ID filtering.
func(_ p2p.IPeerSet, p p2p.Peer) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
},
)
}
sw := p2p.NewSwitch(
config.P2P,
transport,
p2p.WithMetrics(p2pMetrics),
p2p.SwitchPeerFilters(peerFilters...),
p2p.SwitchConnFilters(connFilters...),
)
sw.SetLogger(p2pLogger)
if config.Mode != cfg.ModeSeed {
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
}
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
return sw
}
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
p2pLogger log.Logger, nodeKey types.NodeKey) (pex.AddrBook, error) {
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
// Add ourselves to addrbook to prevent dialing ourselves
if config.P2P.ExternalAddress != "" {
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ExternalAddress))
if err != nil {
return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}
if config.P2P.ListenAddress != "" {
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ListenAddress))
if err != nil {
return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}
sw.SetAddrBook(addrBook)
return addrBook, nil
}
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
sw *p2p.Switch, logger log.Logger) *pex.Reactor {
reactorConfig := &pex.ReactorConfig{
Seeds: tmstrings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.Mode == cfg.ModeSeed,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
}
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewReactor(addrBook, reactorConfig)
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
return pexReactor
}
func createPEXReactorV2(
config *cfg.Config,
func createPEXReactor(
logger log.Logger,
peerManager *p2p.PeerManager,
router *p2p.Router,
) (*pex.ReactorV2, error) {
) (service.Service, error) {
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
channel, err := router.OpenChannel(pex.ChannelDescriptor())
if err != nil {
return nil, err
}
peerUpdates := peerManager.Subscribe()
return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil
return pex.NewReactor(logger, peerManager, channel, peerUpdates), nil
}
func makeNodeInfo(
config *cfg.Config,
cfg *config.Config,
nodeKey types.NodeKey,
eventSinks []indexer.EventSink,
genDoc *types.GenesisDoc,
@@ -706,17 +510,7 @@ func makeNodeInfo(
txIndexerStatus = "on"
}
var bcChannel byte
switch config.BlockSync.Version {
case cfg.BlockSyncV0:
bcChannel = byte(bcv0.BlockchainChannel)
case cfg.BlockSyncV2:
bcChannel = bcv2.BlockchainChannel
default:
return types.NodeInfo{}, fmt.Errorf("unknown blocksync version %s", config.BlockSync.Version)
}
bcChannel := byte(blocksync.BlockSyncChannel)
nodeInfo := types.NodeInfo{
ProtocolVersion: types.ProtocolVersion{
@@ -729,31 +523,32 @@ func makeNodeInfo(
Version: version.TMVersion,
Channels: []byte{
bcChannel,
byte(cs.StateChannel),
byte(cs.DataChannel),
byte(cs.VoteChannel),
byte(cs.VoteSetBitsChannel),
byte(consensus.StateChannel),
byte(consensus.DataChannel),
byte(consensus.VoteChannel),
byte(consensus.VoteSetBitsChannel),
byte(mempool.MempoolChannel),
byte(evidence.EvidenceChannel),
byte(statesync.SnapshotChannel),
byte(statesync.ChunkChannel),
byte(statesync.LightBlockChannel),
byte(statesync.ParamsChannel),
},
Moniker: config.Moniker,
Moniker: cfg.Moniker,
Other: types.NodeInfoOther{
TxIndex: txIndexerStatus,
RPCAddress: config.RPC.ListenAddress,
RPCAddress: cfg.RPC.ListenAddress,
},
}
if config.P2P.PexReactor {
if cfg.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
lAddr := config.P2P.ExternalAddress
lAddr := cfg.P2P.ExternalAddress
if lAddr == "" {
lAddr = config.P2P.ListenAddress
lAddr = cfg.P2P.ListenAddress
}
nodeInfo.ListenAddr = lAddr
@@ -763,7 +558,7 @@ func makeNodeInfo(
}
func makeSeedNodeInfo(
config *cfg.Config,
cfg *config.Config,
nodeKey types.NodeKey,
genDoc *types.GenesisDoc,
state sm.State,
@@ -778,21 +573,21 @@ func makeSeedNodeInfo(
Network: genDoc.ChainID,
Version: version.TMVersion,
Channels: []byte{},
Moniker: config.Moniker,
Moniker: cfg.Moniker,
Other: types.NodeInfoOther{
TxIndex: "off",
RPCAddress: config.RPC.ListenAddress,
RPCAddress: cfg.RPC.ListenAddress,
},
}
if config.P2P.PexReactor {
if cfg.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
lAddr := config.P2P.ExternalAddress
lAddr := cfg.P2P.ExternalAddress
if lAddr == "" {
lAddr = config.P2P.ListenAddress
lAddr = cfg.P2P.ListenAddress
}
nodeInfo.ListenAddr = lAddr