mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-12 15:52:50 +00:00
Compare commits
10 Commits
feature/ab
...
cal/node-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7018c73baf | ||
|
|
4021df503b | ||
|
|
11976a4863 | ||
|
|
296ec7e113 | ||
|
|
933256c862 | ||
|
|
a2610a9998 | ||
|
|
da8810f480 | ||
|
|
10dca45e8c | ||
|
|
67bc51ad72 | ||
|
|
3faf580a0d |
30
blocksync/metrics.gen.go
Normal file
30
blocksync/metrics.gen.go
Normal file
@@ -0,0 +1,30 @@
|
||||
// Code generated by metricsgen. DO NOT EDIT.
|
||||
|
||||
package blocksync
|
||||
|
||||
import (
|
||||
"github.com/go-kit/kit/metrics/discard"
|
||||
prometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
labels := []string{}
|
||||
for i := 0; i < len(labelsAndValues); i += 2 {
|
||||
labels = append(labels, labelsAndValues[i])
|
||||
}
|
||||
return &Metrics{
|
||||
Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "syncing",
|
||||
Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
}
|
||||
}
|
||||
|
||||
func NopMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
Syncing: discard.NewGauge(),
|
||||
}
|
||||
}
|
||||
19
blocksync/metrics.go
Normal file
19
blocksync/metrics.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package blocksync
|
||||
|
||||
import (
|
||||
"github.com/go-kit/kit/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
|
||||
// package.
|
||||
MetricsSubsystem = "blocksync"
|
||||
)
|
||||
|
||||
//go:generate go run ../scripts/metricsgen -struct=Metrics
|
||||
|
||||
// Metrics contains metrics exposed by this package.
|
||||
type Metrics struct {
|
||||
// Whether or not a node is block syncing. 1 if yes, 0 if no.
|
||||
Syncing metrics.Gauge
|
||||
}
|
||||
@@ -99,6 +99,9 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
|
||||
// OnStart implements service.Service by spawning requesters routine and recording
|
||||
// pool's start time.
|
||||
func (pool *BlockPool) OnStart() error {
|
||||
if pool.height == 0 {
|
||||
return errors.New("height not set")
|
||||
}
|
||||
go pool.makeRequestersRoutine()
|
||||
pool.startTime = time.Now()
|
||||
return nil
|
||||
@@ -111,22 +114,19 @@ func (pool *BlockPool) makeRequestersRoutine() {
|
||||
break
|
||||
}
|
||||
|
||||
_, numPending, lenRequesters := pool.GetStatus()
|
||||
switch {
|
||||
case numPending >= maxPendingRequests:
|
||||
height, maxPeerHeight, numPending, lenRequesters := pool.GetStatus()
|
||||
if height >= maxPeerHeight ||
|
||||
numPending >= maxPendingRequests ||
|
||||
lenRequesters >= maxTotalRequesters {
|
||||
// sleep for a bit.
|
||||
time.Sleep(requestIntervalMS * time.Millisecond)
|
||||
// check for timed out peers
|
||||
pool.removeTimedoutPeers()
|
||||
case lenRequesters >= maxTotalRequesters:
|
||||
// sleep for a bit.
|
||||
time.Sleep(requestIntervalMS * time.Millisecond)
|
||||
// check for timed out peers
|
||||
pool.removeTimedoutPeers()
|
||||
default:
|
||||
// request for more blocks.
|
||||
pool.makeNextRequester()
|
||||
continue
|
||||
}
|
||||
|
||||
// request for more blocks.
|
||||
pool.makeNextRequester()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,11 +156,11 @@ func (pool *BlockPool) removeTimedoutPeers() {
|
||||
|
||||
// GetStatus returns pool's height, numPending requests and the number of
|
||||
// requesters.
|
||||
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
|
||||
func (pool *BlockPool) GetStatus() (height, maxPeerHeight int64, numPending int32, lenRequesters int) {
|
||||
pool.mtx.Lock()
|
||||
defer pool.mtx.Unlock()
|
||||
|
||||
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
|
||||
return pool.height, pool.maxPeerHeight, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
|
||||
}
|
||||
|
||||
// IsCaughtUp returns true if this node is caught up, false - otherwise.
|
||||
@@ -302,6 +302,7 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
|
||||
}
|
||||
|
||||
if height > pool.maxPeerHeight {
|
||||
pool.Logger.Info("new max peer height", "height", height)
|
||||
pool.maxPeerHeight = height
|
||||
}
|
||||
}
|
||||
@@ -388,7 +389,7 @@ func (pool *BlockPool) makeNextRequester() {
|
||||
|
||||
err := request.Start()
|
||||
if err != nil {
|
||||
request.Logger.Error("Error starting request", "err", err)
|
||||
pool.Logger.Error("Error starting request", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ const (
|
||||
type consensusReactor interface {
|
||||
// for when we switch from blocksync reactor and block sync to
|
||||
// the consensus machine
|
||||
SwitchToConsensus(state sm.State, skipWAL bool)
|
||||
SwitchToConsensus(state sm.State, skipWAL bool) error
|
||||
}
|
||||
|
||||
type peerError struct {
|
||||
@@ -54,16 +54,15 @@ type Reactor struct {
|
||||
blockExec *sm.BlockExecutor
|
||||
store *store.BlockStore
|
||||
pool *BlockPool
|
||||
blockSync bool
|
||||
|
||||
requestsCh <-chan BlockRequest
|
||||
errorsCh <-chan peerError
|
||||
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// NewReactor returns new reactor instance.
|
||||
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
|
||||
blockSync bool) *Reactor {
|
||||
|
||||
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, metrics *Metrics) *Reactor {
|
||||
if state.LastBlockHeight != store.Height() {
|
||||
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
|
||||
store.Height()))
|
||||
@@ -85,9 +84,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
|
||||
blockExec: blockExec,
|
||||
store: store,
|
||||
pool: pool,
|
||||
blockSync: blockSync,
|
||||
requestsCh: requestsCh,
|
||||
errorsCh: errorsCh,
|
||||
metrics: metrics,
|
||||
}
|
||||
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
|
||||
return bcR
|
||||
@@ -101,22 +100,22 @@ func (bcR *Reactor) SetLogger(l log.Logger) {
|
||||
|
||||
// OnStart implements service.Service.
|
||||
func (bcR *Reactor) OnStart() error {
|
||||
if bcR.blockSync {
|
||||
err := bcR.pool.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go bcR.poolRoutine(false)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsSyncing returns whether the node is using blocksync to advance heights
|
||||
func (bcR *Reactor) IsSyncing() bool {
|
||||
return bcR.pool.IsRunning()
|
||||
}
|
||||
|
||||
// SwitchToBlockSync is called by the state sync reactor when switching to block sync.
|
||||
func (bcR *Reactor) SwitchToBlockSync(state sm.State) error {
|
||||
bcR.blockSync = true
|
||||
bcR.initialState = state
|
||||
|
||||
bcR.pool.height = state.LastBlockHeight + 1
|
||||
if state.LastBlockHeight == 0 {
|
||||
bcR.pool.height = state.InitialHeight
|
||||
} else {
|
||||
bcR.pool.height = state.LastBlockHeight + 1
|
||||
}
|
||||
err := bcR.pool.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -127,7 +126,7 @@ func (bcR *Reactor) SwitchToBlockSync(state sm.State) error {
|
||||
|
||||
// OnStop implements service.Service.
|
||||
func (bcR *Reactor) OnStop() {
|
||||
if bcR.blockSync {
|
||||
if bcR.pool.IsRunning() {
|
||||
if err := bcR.pool.Stop(); err != nil {
|
||||
bcR.Logger.Error("Error stopping pool", "err", err)
|
||||
}
|
||||
@@ -236,6 +235,8 @@ func (bcR *Reactor) Receive(e p2p.Envelope) {
|
||||
// Handle messages from the poolReactor telling the reactor what to do.
|
||||
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
|
||||
func (bcR *Reactor) poolRoutine(stateSynced bool) {
|
||||
bcR.metrics.Syncing.Set(1)
|
||||
defer bcR.metrics.Syncing.Set(0)
|
||||
|
||||
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
|
||||
defer trySyncTicker.Stop()
|
||||
@@ -293,24 +294,28 @@ FOR_LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-switchToConsensusTicker.C:
|
||||
height, numPending, lenRequesters := bcR.pool.GetStatus()
|
||||
height, peerHeight, numPending, lenRequesters := bcR.pool.GetStatus()
|
||||
outbound, inbound, _ := bcR.Switch.NumPeers()
|
||||
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
|
||||
"outbound", outbound, "inbound", inbound)
|
||||
"outbound", outbound, "inbound", inbound, "peerHeight", peerHeight)
|
||||
if bcR.pool.IsCaughtUp() {
|
||||
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
|
||||
if err := bcR.pool.Stop(); err != nil {
|
||||
bcR.Logger.Error("Error stopping pool", "err", err)
|
||||
}
|
||||
|
||||
// TODO: node struct should be responsible for switching from block sync to
|
||||
// consensus. It's messy to have to grab the consensus reactor from the switch
|
||||
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
|
||||
if ok {
|
||||
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
|
||||
err := conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
|
||||
bcR.Logger.Error("failed to switch to consensus", "err", err)
|
||||
}
|
||||
// else {
|
||||
// should only happen during testing
|
||||
// }
|
||||
|
||||
break FOR_LOOP
|
||||
return
|
||||
}
|
||||
|
||||
case <-trySyncTicker.C: // chan time
|
||||
|
||||
@@ -95,14 +95,6 @@ func newReactor(
|
||||
mock.Anything,
|
||||
mock.Anything).Return(nil)
|
||||
|
||||
// Make the Reactor itself.
|
||||
// NOTE we have to create and commit the blocks first because
|
||||
// pool.height is determined from the store.
|
||||
fastSync := true
|
||||
db := dbm.NewMemDB()
|
||||
stateStore = sm.NewStore(db, sm.StoreOptions{
|
||||
DiscardABCIResponses: false,
|
||||
})
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
|
||||
mp, sm.EmptyEvidencePool{}, blockStore)
|
||||
if err = stateStore.Save(state); err != nil {
|
||||
@@ -145,7 +137,7 @@ func newReactor(
|
||||
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
|
||||
}
|
||||
|
||||
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, NopMetrics())
|
||||
bcReactor.SetLogger(logger.With("module", "blocksync"))
|
||||
|
||||
return ReactorPair{bcReactor, proxyApp}
|
||||
@@ -156,6 +148,9 @@ func TestNoBlockResponse(t *testing.T) {
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
genDoc, privVals := randGenesisDoc(1, false, 30)
|
||||
|
||||
state, err := sm.MakeGenesisState(genDoc)
|
||||
require.NoError(t, err)
|
||||
|
||||
maxBlockHeight := int64(65)
|
||||
|
||||
reactorPairs := make([]ReactorPair, 2)
|
||||
@@ -169,6 +164,12 @@ func TestNoBlockResponse(t *testing.T) {
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
|
||||
for _, reactor := range reactorPairs {
|
||||
// turn on the syncing algorithm
|
||||
err := reactor.reactor.SwitchToBlockSync(state)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
for _, r := range reactorPairs {
|
||||
err := r.reactor.Stop()
|
||||
@@ -218,6 +219,9 @@ func TestBadBlockStopsPeer(t *testing.T) {
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
genDoc, privVals := randGenesisDoc(1, false, 30)
|
||||
|
||||
state, err := sm.MakeGenesisState(genDoc)
|
||||
require.NoError(t, err)
|
||||
|
||||
maxBlockHeight := int64(148)
|
||||
|
||||
// Other chain needs a different validator set
|
||||
@@ -244,6 +248,12 @@ func TestBadBlockStopsPeer(t *testing.T) {
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
|
||||
for _, reactor := range reactorPairs {
|
||||
// turn on the syncing algorithm
|
||||
err := reactor.reactor.SwitchToBlockSync(state)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
for _, r := range reactorPairs {
|
||||
err := r.reactor.Stop()
|
||||
@@ -287,6 +297,11 @@ func TestBadBlockStopsPeer(t *testing.T) {
|
||||
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
|
||||
}
|
||||
|
||||
otherState, err := sm.MakeGenesisState(otherGenDoc)
|
||||
require.NoError(t, err)
|
||||
err = lastReactorPair.reactor.SwitchToBlockSync(otherState)
|
||||
require.NoError(t, err)
|
||||
|
||||
for {
|
||||
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
|
||||
break
|
||||
|
||||
@@ -46,6 +46,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
appFunc := newKVStore
|
||||
|
||||
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
|
||||
state, err := sm.MakeGenesisState(genDoc)
|
||||
require.NoError(t, err)
|
||||
css := make([]*State, nValidators)
|
||||
|
||||
for i := 0; i < nValidators; i++ {
|
||||
@@ -54,7 +56,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
|
||||
DiscardABCIResponses: false,
|
||||
})
|
||||
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
|
||||
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
|
||||
defer os.RemoveAll(thisConfig.RootDir)
|
||||
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
|
||||
@@ -102,7 +103,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
|
||||
// Make State
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
|
||||
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
|
||||
cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy()))
|
||||
cs.SetLogger(cs.Logger)
|
||||
// set private validator
|
||||
pv := privVals[i]
|
||||
@@ -125,7 +126,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
blocksSubs := make([]types.Subscription, 0)
|
||||
eventBuses := make([]*types.EventBus, nValidators)
|
||||
for i := 0; i < nValidators; i++ {
|
||||
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
|
||||
// Note, we dont start the consensus states
|
||||
reactors[i] = NewReactor(css[i])
|
||||
reactors[i].SetLogger(css[i].Logger)
|
||||
|
||||
// eventBus is already started with the cs
|
||||
@@ -254,8 +256,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
|
||||
// start the consensus reactors
|
||||
for i := 0; i < nValidators; i++ {
|
||||
s := reactors[i].conS.GetState()
|
||||
reactors[i].SwitchToConsensus(s, false)
|
||||
require.NoError(t, reactors[i].SwitchToConsensus(state.Copy(), false))
|
||||
}
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
|
||||
@@ -314,7 +315,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
|
||||
N := 4
|
||||
logger := consensusLogger().With("test", "byzantine")
|
||||
app := newKVStore
|
||||
css, cleanup := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), app)
|
||||
css, cleanup := randConsensusNet(t, N, "consensus_byzantine_test", newMockTickerFunc(false), app)
|
||||
defer cleanup()
|
||||
|
||||
// give the byzantine validator a normal ticker
|
||||
@@ -363,7 +364,8 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
|
||||
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
conR := NewReactor(css[i], true) // so we don't start the consensus states
|
||||
// Note, we don't start the consensus states
|
||||
conR := NewReactor(css[i])
|
||||
conR.SetLogger(logger.With("validator", i))
|
||||
conR.SetEventBus(eventBus)
|
||||
|
||||
@@ -407,13 +409,13 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
|
||||
// note these must be started before the byz
|
||||
for i := 1; i < N; i++ {
|
||||
cr := reactors[i].(*Reactor)
|
||||
cr.SwitchToConsensus(cr.conS.GetState(), false)
|
||||
require.NoError(t, cr.SwitchToConsensus(cr.conS.GetState(), false))
|
||||
}
|
||||
|
||||
// start the byzantine state machine
|
||||
byzR := reactors[0].(*ByzantineReactor)
|
||||
s := byzR.reactor.conS.GetState()
|
||||
byzR.reactor.SwitchToConsensus(s, false)
|
||||
require.NoError(t, byzR.reactor.SwitchToConsensus(s, false))
|
||||
|
||||
// byz proposer sends one block to peers[0]
|
||||
// and the other block to peers[1] and peers[2].
|
||||
@@ -592,7 +594,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
|
||||
|
||||
// Send our state to peer.
|
||||
// If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
|
||||
if !br.reactor.waitSync {
|
||||
if br.reactor.conS.IsRunning() {
|
||||
br.reactor.sendNewRoundStepMessage(peer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -440,7 +440,7 @@ func newStateWithConfigAndBlockStore(
|
||||
}
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
|
||||
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
|
||||
cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy()))
|
||||
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
|
||||
cs.SetPrivValidator(pv)
|
||||
|
||||
@@ -747,18 +747,17 @@ func consensusLogger() log.Logger {
|
||||
}).With("module", "consensus")
|
||||
}
|
||||
|
||||
func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker,
|
||||
func randConsensusNet(t *testing.T, nValidators int, testName string, tickerFunc func() TimeoutTicker,
|
||||
appFunc func() abci.Application, configOpts ...func(*cfg.Config)) ([]*State, cleanupFunc) {
|
||||
t.Helper()
|
||||
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
|
||||
state, err := sm.MakeGenesisState(genDoc)
|
||||
require.NoError(t, err)
|
||||
css := make([]*State, nValidators)
|
||||
logger := consensusLogger()
|
||||
configRootDirs := make([]string, 0, nValidators)
|
||||
for i := 0; i < nValidators; i++ {
|
||||
stateDB := dbm.NewMemDB() // each state needs its own db
|
||||
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
|
||||
DiscardABCIResponses: false,
|
||||
})
|
||||
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
|
||||
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
|
||||
configRootDirs = append(configRootDirs, thisConfig.RootDir)
|
||||
for _, opt := range configOpts {
|
||||
@@ -772,6 +771,7 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
|
||||
css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB)
|
||||
css[i].SetTimeoutTicker(tickerFunc())
|
||||
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
|
||||
css[i].updateToState(state.Copy())
|
||||
}
|
||||
return css, func() {
|
||||
for _, dir := range configRootDirs {
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
// Ensure a testnet makes blocks
|
||||
func TestReactorInvalidPrecommit(t *testing.T) {
|
||||
N := 4
|
||||
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
defer cleanup()
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
|
||||
@@ -118,18 +118,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
Name: "latest_block_height",
|
||||
Help: "The latest block height.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
BlockSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "block_syncing",
|
||||
Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "state_syncing",
|
||||
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
@@ -208,8 +196,6 @@ func NopMetrics() *Metrics {
|
||||
BlockSizeBytes: discard.NewGauge(),
|
||||
TotalTxs: discard.NewGauge(),
|
||||
CommittedHeight: discard.NewGauge(),
|
||||
BlockSyncing: discard.NewGauge(),
|
||||
StateSyncing: discard.NewGauge(),
|
||||
BlockParts: discard.NewCounter(),
|
||||
StepDurationSeconds: discard.NewHistogram(),
|
||||
BlockGossipPartsReceived: discard.NewCounter(),
|
||||
|
||||
@@ -61,10 +61,6 @@ type Metrics struct {
|
||||
TotalTxs metrics.Gauge
|
||||
// The latest block height.
|
||||
CommittedHeight metrics.Gauge `metrics_name:"latest_block_height"`
|
||||
// Whether or not a node is block syncing. 1 if yes, 0 if no.
|
||||
BlockSyncing metrics.Gauge
|
||||
// Whether or not a node is state syncing. 1 if yes, 0 if no.
|
||||
StateSyncing metrics.Gauge
|
||||
|
||||
// Number of block parts transmitted by each peer.
|
||||
BlockParts metrics.Counter `metrics_labels:"peer_id"`
|
||||
|
||||
@@ -42,7 +42,6 @@ type Reactor struct {
|
||||
conS *State
|
||||
|
||||
mtx tmsync.RWMutex
|
||||
waitSync bool
|
||||
eventBus *types.EventBus
|
||||
rs *cstypes.RoundState
|
||||
|
||||
@@ -53,12 +52,11 @@ type ReactorOption func(*Reactor)
|
||||
|
||||
// NewReactor returns a new Reactor with the given
|
||||
// consensusState.
|
||||
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
|
||||
func NewReactor(consensusState *State, options ...ReactorOption) *Reactor {
|
||||
conR := &Reactor{
|
||||
conS: consensusState,
|
||||
waitSync: waitSync,
|
||||
rs: consensusState.GetRoundState(),
|
||||
Metrics: NopMetrics(),
|
||||
conS: consensusState,
|
||||
rs: consensusState.GetRoundState(),
|
||||
Metrics: NopMetrics(),
|
||||
}
|
||||
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
|
||||
|
||||
@@ -72,21 +70,12 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
|
||||
// OnStart implements BaseService by subscribing to events, which later will be
|
||||
// broadcasted to other peers and starting state if we're not in block sync.
|
||||
func (conR *Reactor) OnStart() error {
|
||||
conR.Logger.Info("Reactor ", "waitSync", conR.WaitSync())
|
||||
|
||||
// start routine that computes peer statistics for evaluating peer quality
|
||||
go conR.peerStatsRoutine()
|
||||
|
||||
conR.subscribeToBroadcastEvents()
|
||||
go conR.updateRoundStateRoutine()
|
||||
|
||||
if !conR.WaitSync() {
|
||||
err := conR.conS.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -94,47 +83,34 @@ func (conR *Reactor) OnStart() error {
|
||||
// state.
|
||||
func (conR *Reactor) OnStop() {
|
||||
conR.unsubscribeFromBroadcastEvents()
|
||||
if err := conR.conS.Stop(); err != nil {
|
||||
conR.Logger.Error("Error stopping consensus state", "err", err)
|
||||
}
|
||||
if !conR.WaitSync() {
|
||||
if conR.conS.IsRunning() {
|
||||
if err := conR.conS.Stop(); err != nil {
|
||||
conR.Logger.Error("Error stopping consensus state", "err", err)
|
||||
}
|
||||
conR.conS.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (conR *Reactor) IsConsensusRunning() bool {
|
||||
return conR.conS.IsRunning()
|
||||
}
|
||||
|
||||
// SwitchToConsensus switches from block_sync mode to consensus mode.
|
||||
// It resets the state, turns off block_sync, and starts the consensus state-machine
|
||||
func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) {
|
||||
func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) error {
|
||||
conR.Logger.Info("SwitchToConsensus")
|
||||
|
||||
// We have no votes, so reconstruct LastCommit from SeenCommit.
|
||||
if state.LastBlockHeight > 0 {
|
||||
if state.LastBlockHeight > state.InitialHeight {
|
||||
conR.conS.reconstructLastCommit(state)
|
||||
}
|
||||
|
||||
// NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a
|
||||
// NewRoundStepMessage.
|
||||
conR.conS.updateToState(state)
|
||||
|
||||
conR.mtx.Lock()
|
||||
conR.waitSync = false
|
||||
conR.mtx.Unlock()
|
||||
conR.Metrics.BlockSyncing.Set(0)
|
||||
conR.Metrics.StateSyncing.Set(0)
|
||||
|
||||
if skipWAL {
|
||||
conR.conS.doWALCatchup = false
|
||||
}
|
||||
err := conR.conS.Start()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf(`Failed to start consensus state: %v
|
||||
|
||||
conS:
|
||||
%+v
|
||||
|
||||
conR:
|
||||
%+v`, err, conR.conS, conR))
|
||||
}
|
||||
return conR.conS.Start()
|
||||
}
|
||||
|
||||
// GetChannels implements Reactor
|
||||
@@ -201,7 +177,7 @@ func (conR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
|
||||
// Send our state to peer.
|
||||
// If we're block_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
|
||||
if !conR.WaitSync() {
|
||||
if conR.conS.IsRunning() {
|
||||
conR.sendNewRoundStepMessage(peer)
|
||||
}
|
||||
}
|
||||
@@ -311,7 +287,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
case DataChannel:
|
||||
if conR.WaitSync() {
|
||||
if !conR.conS.IsRunning() {
|
||||
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
|
||||
return
|
||||
}
|
||||
@@ -330,7 +306,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
case VoteChannel:
|
||||
if conR.WaitSync() {
|
||||
if !conR.conS.IsRunning() {
|
||||
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
|
||||
return
|
||||
}
|
||||
@@ -352,7 +328,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
case VoteSetBitsChannel:
|
||||
if conR.WaitSync() {
|
||||
if !conR.conS.IsRunning() {
|
||||
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
|
||||
return
|
||||
}
|
||||
@@ -393,13 +369,6 @@ func (conR *Reactor) SetEventBus(b *types.EventBus) {
|
||||
conR.conS.SetEventBus(b)
|
||||
}
|
||||
|
||||
// WaitSync returns whether the consensus reactor is waiting for state/block sync.
|
||||
func (conR *Reactor) WaitSync() bool {
|
||||
conR.mtx.RLock()
|
||||
defer conR.mtx.RUnlock()
|
||||
return conR.waitSync
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
|
||||
// subscribeToBroadcastEvents subscribes for new round steps and votes
|
||||
@@ -543,6 +512,11 @@ OUTER_LOOP:
|
||||
if !peer.IsRunning() || !conR.IsRunning() {
|
||||
return
|
||||
}
|
||||
if !conR.IsConsensusRunning() {
|
||||
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
|
||||
rs := conR.getRoundState()
|
||||
prs := ps.GetRoundState()
|
||||
|
||||
@@ -687,7 +661,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
|
||||
}
|
||||
return
|
||||
}
|
||||
// logger.Info("No parts to send in catch-up, sleeping")
|
||||
|
||||
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
|
||||
}
|
||||
|
||||
@@ -697,12 +671,16 @@ func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
|
||||
// Simple hack to throttle logs upon sleep.
|
||||
var sleeping = 0
|
||||
|
||||
OUTER_LOOP:
|
||||
for {
|
||||
// Manage disconnects from self or peer.
|
||||
if !peer.IsRunning() || !conR.IsRunning() {
|
||||
return
|
||||
}
|
||||
if !conR.IsConsensusRunning() {
|
||||
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
rs := conR.getRoundState()
|
||||
prs := ps.GetRoundState()
|
||||
|
||||
@@ -713,14 +691,11 @@ OUTER_LOOP:
|
||||
sleeping = 0
|
||||
}
|
||||
|
||||
// logger.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
|
||||
// "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
|
||||
|
||||
// If height matches, then send LastCommit, Prevotes, Precommits.
|
||||
if rs.Height == prs.Height {
|
||||
heightLogger := logger.With("height", prs.Height)
|
||||
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
|
||||
continue OUTER_LOOP
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -729,7 +704,7 @@ OUTER_LOOP:
|
||||
if prs.Height != 0 && rs.Height == prs.Height+1 {
|
||||
if ps.PickSendVote(rs.LastCommit) {
|
||||
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
|
||||
continue OUTER_LOOP
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -742,7 +717,7 @@ OUTER_LOOP:
|
||||
if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil {
|
||||
if ps.PickSendVote(commit) {
|
||||
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
|
||||
continue OUTER_LOOP
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -759,7 +734,7 @@ OUTER_LOOP:
|
||||
}
|
||||
|
||||
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
|
||||
continue OUTER_LOOP
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -833,6 +808,11 @@ OUTER_LOOP:
|
||||
return
|
||||
}
|
||||
|
||||
if !conR.IsConsensusRunning() {
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
|
||||
// Maybe send Height/Round/Prevotes
|
||||
{
|
||||
rs := conR.getRoundState()
|
||||
@@ -918,8 +898,6 @@ OUTER_LOOP:
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,9 +55,8 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
|
||||
blocksSubs := make([]types.Subscription, 0)
|
||||
eventBuses := make([]*types.EventBus, n)
|
||||
for i := 0; i < n; i++ {
|
||||
/*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info")
|
||||
if err != nil { t.Fatal(err)}*/
|
||||
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
|
||||
// Note, we dont start the consensus states
|
||||
reactors[i] = NewReactor(css[i])
|
||||
reactors[i].SetLogger(css[i].Logger)
|
||||
|
||||
// eventBus is already started with the cs
|
||||
@@ -88,7 +87,8 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
|
||||
// TODO: is this still true with new pubsub?
|
||||
for i := 0; i < n; i++ {
|
||||
s := reactors[i].conS.GetState()
|
||||
reactors[i].SwitchToConsensus(s, false)
|
||||
err := reactors[i].SwitchToConsensus(s, false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
return reactors, blocksSubs, eventBuses
|
||||
}
|
||||
@@ -113,7 +113,7 @@ func stopConsensusNet(logger log.Logger, reactors []*Reactor, eventBuses []*type
|
||||
// Ensure a testnet makes blocks
|
||||
func TestReactorBasic(t *testing.T) {
|
||||
N := 4
|
||||
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
defer cleanup()
|
||||
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
@@ -135,6 +135,8 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
// css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
|
||||
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
|
||||
state, err := sm.MakeGenesisState(genDoc)
|
||||
require.NoError(t, err)
|
||||
css := make([]*State, nValidators)
|
||||
logger := consensusLogger()
|
||||
for i := 0; i < nValidators; i++ {
|
||||
@@ -142,7 +144,6 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
|
||||
DiscardABCIResponses: false,
|
||||
})
|
||||
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
|
||||
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
|
||||
defer os.RemoveAll(thisConfig.RootDir)
|
||||
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
|
||||
@@ -203,7 +204,7 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
|
||||
// Make State
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
|
||||
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
|
||||
cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool2, WithState(state.Copy()))
|
||||
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
|
||||
cs.SetPrivValidator(pv)
|
||||
|
||||
@@ -237,7 +238,7 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
// Ensure a testnet makes blocks when there are txs
|
||||
func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
|
||||
N := 4
|
||||
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore,
|
||||
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore,
|
||||
func(c *cfg.Config) {
|
||||
c.Consensus.CreateEmptyBlocks = false
|
||||
})
|
||||
@@ -258,7 +259,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
|
||||
|
||||
func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
|
||||
N := 1
|
||||
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
defer cleanup()
|
||||
reactors, _, eventBuses := startConsensusNet(t, css, N)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
@@ -284,7 +285,7 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
|
||||
|
||||
func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
|
||||
N := 1
|
||||
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
defer cleanup()
|
||||
reactors, _, eventBuses := startConsensusNet(t, css, N)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
@@ -310,7 +311,7 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
|
||||
// Test we record stats about votes and block parts from other peers.
|
||||
func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
|
||||
N := 4
|
||||
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
|
||||
defer cleanup()
|
||||
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
@@ -336,6 +337,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
|
||||
nVals := 4
|
||||
logger := log.TestingLogger()
|
||||
css, cleanup := randConsensusNet(
|
||||
t,
|
||||
nVals,
|
||||
"consensus_voting_power_changes_test",
|
||||
newMockTickerFunc(true),
|
||||
@@ -531,7 +533,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
|
||||
// Check we can make blocks with skip_timeout_commit=false
|
||||
func TestReactorWithTimeoutCommit(t *testing.T) {
|
||||
N := 4
|
||||
css, cleanup := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newKVStore)
|
||||
css, cleanup := randConsensusNet(t, N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newKVStore)
|
||||
defer cleanup()
|
||||
// override default SkipTimeoutCommit == true for tests
|
||||
for i := 0; i < N; i++ {
|
||||
|
||||
@@ -129,8 +129,8 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
|
||||
}
|
||||
pb.cs.Wait()
|
||||
|
||||
newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
|
||||
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
|
||||
newCS := NewState(pb.cs.config, pb.cs.blockExec,
|
||||
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, WithState(pb.genesisState.Copy()))
|
||||
newCS.SetEventBus(pb.cs.eventBus)
|
||||
newCS.startForReplay()
|
||||
|
||||
@@ -332,8 +332,8 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
|
||||
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
|
||||
|
||||
consensusState := NewState(csConfig, state.Copy(), blockExec,
|
||||
blockStore, mempool, evpool)
|
||||
consensusState := NewState(csConfig, blockExec,
|
||||
blockStore, mempool, evpool, WithState(state.Copy()))
|
||||
|
||||
consensusState.SetEventBus(eventBus)
|
||||
return consensusState
|
||||
|
||||
@@ -67,7 +67,8 @@ func TestMain(m *testing.M) {
|
||||
func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config,
|
||||
lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) {
|
||||
logger := log.TestingLogger()
|
||||
state, _ := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile())
|
||||
state, err := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
privValidator := loadPrivValidator(consensusReplayConfig)
|
||||
cs := newStateWithConfigAndBlockStore(
|
||||
consensusReplayConfig,
|
||||
@@ -81,7 +82,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
|
||||
bytes, _ := os.ReadFile(cs.config.WalFile())
|
||||
t.Logf("====== WAL: \n\r%X\n", bytes)
|
||||
|
||||
err := cs.Start()
|
||||
err = cs.Start()
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
if err := cs.Stop(); err != nil {
|
||||
@@ -555,40 +556,40 @@ func TestSimulateValidatorsChange(t *testing.T) {
|
||||
// Sync from scratch
|
||||
func TestHandshakeReplayAll(t *testing.T) {
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, config, 0, m, false)
|
||||
testHandshakeReplay(t, 0, m, false)
|
||||
}
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, config, 0, m, true)
|
||||
testHandshakeReplay(t, 0, m, true)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync many, not from scratch
|
||||
func TestHandshakeReplaySome(t *testing.T) {
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, config, 2, m, false)
|
||||
testHandshakeReplay(t, 2, m, false)
|
||||
}
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, config, 2, m, true)
|
||||
testHandshakeReplay(t, 2, m, true)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync from lagging by one
|
||||
func TestHandshakeReplayOne(t *testing.T) {
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, config, numBlocks-1, m, false)
|
||||
testHandshakeReplay(t, numBlocks-1, m, false)
|
||||
}
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, config, numBlocks-1, m, true)
|
||||
testHandshakeReplay(t, numBlocks-1, m, true)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync from caught up
|
||||
func TestHandshakeReplayNone(t *testing.T) {
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, config, numBlocks, m, false)
|
||||
testHandshakeReplay(t, numBlocks, m, false)
|
||||
}
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, config, numBlocks, m, true)
|
||||
testHandshakeReplay(t, numBlocks, m, true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -660,25 +661,27 @@ func tempWALWithData(data []byte) string {
|
||||
|
||||
// Make some blocks. Start a fresh app and apply nBlocks blocks.
|
||||
// Then restart the app and sync it up with the remaining blocks
|
||||
func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint, testValidatorsChange bool) {
|
||||
var chain []*types.Block
|
||||
var commits []*types.Commit
|
||||
var store *mockBlockStore
|
||||
var stateDB dbm.DB
|
||||
var genesisState sm.State
|
||||
func testHandshakeReplay(t *testing.T, nBlocks int, mode uint, testValidatorsChange bool) {
|
||||
var (
|
||||
chain []*types.Block
|
||||
commits []*types.Commit
|
||||
store *mockBlockStore
|
||||
stateDB dbm.DB
|
||||
genesisState sm.State
|
||||
config *cfg.Config
|
||||
)
|
||||
if testValidatorsChange {
|
||||
testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
|
||||
defer os.RemoveAll(testConfig.RootDir)
|
||||
config = ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
stateDB = dbm.NewMemDB()
|
||||
|
||||
genesisState = sim.GenesisState
|
||||
config = sim.Config
|
||||
chain = append([]*types.Block{}, sim.Chain...) // copy chain
|
||||
commits = sim.Commits
|
||||
store = newMockBlockStore(t, config, genesisState.ConsensusParams)
|
||||
} else { // test single node
|
||||
testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
|
||||
defer os.RemoveAll(testConfig.RootDir)
|
||||
config = ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
walBody, err := WALWithNBlocks(t, numBlocks)
|
||||
require.NoError(t, err)
|
||||
walFile := tempWALWithData(walBody)
|
||||
@@ -811,14 +814,11 @@ func buildAppStateFromChain(t *testing.T, proxyApp proxy.AppConns, stateStore sm
|
||||
|
||||
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
|
||||
validators := types.TM2PB.ValidatorUpdates(state.Validators)
|
||||
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
|
||||
_, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
|
||||
Validators: validators,
|
||||
}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := stateStore.Save(state); err != nil { // save height 1's validatorsInfo
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stateStore.Save(state))
|
||||
switch mode {
|
||||
case 0:
|
||||
for i := 0; i < nBlocks; i++ {
|
||||
|
||||
@@ -148,7 +148,6 @@ type StateOption func(*State)
|
||||
// NewState returns a new State.
|
||||
func NewState(
|
||||
config *cfg.ConsensusConfig,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore sm.BlockStore,
|
||||
txNotifier txNotifier,
|
||||
@@ -177,13 +176,6 @@ func NewState(
|
||||
cs.doPrevote = cs.defaultDoPrevote
|
||||
cs.setProposal = cs.defaultSetProposal
|
||||
|
||||
// We have no votes, so reconstruct LastCommit from SeenCommit.
|
||||
if state.LastBlockHeight > 0 {
|
||||
cs.reconstructLastCommit(state)
|
||||
}
|
||||
|
||||
cs.updateToState(state)
|
||||
|
||||
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
|
||||
|
||||
cs.BaseService = *service.NewBaseService(nil, "State", cs)
|
||||
@@ -207,10 +199,19 @@ func (cs *State) SetEventBus(b *types.EventBus) {
|
||||
}
|
||||
|
||||
// StateMetrics sets the metrics.
|
||||
func StateMetrics(metrics *Metrics) StateOption {
|
||||
func WithMetrics(metrics *Metrics) StateOption {
|
||||
return func(cs *State) { cs.metrics = metrics }
|
||||
}
|
||||
|
||||
func WithState(state sm.State) StateOption {
|
||||
return func(cs *State) {
|
||||
if state.LastBlockHeight > 0 {
|
||||
cs.reconstructLastCommit(state)
|
||||
}
|
||||
cs.updateToState(state)
|
||||
}
|
||||
}
|
||||
|
||||
// String returns a string.
|
||||
func (cs *State) String() string {
|
||||
// better not to access shared variables
|
||||
@@ -297,6 +298,10 @@ func (cs *State) LoadCommit(height int64) *types.Commit {
|
||||
// OnStart loads the latest state via the WAL, and starts the timeout and
|
||||
// receive routines.
|
||||
func (cs *State) OnStart() error {
|
||||
if cs.state.IsEmpty() {
|
||||
return errors.New("no state to commence consensus on")
|
||||
}
|
||||
|
||||
// We may set the WAL in testing before calling Start, so only OpenWAL if its
|
||||
// still the nilWAL.
|
||||
if _, ok := cs.wal.(nilWAL); ok {
|
||||
@@ -612,7 +617,7 @@ func (cs *State) updateToState(state sm.State) {
|
||||
// signal the new round step, because other services (eg. txNotifier)
|
||||
// depend on having an up-to-date peer state!
|
||||
if state.LastBlockHeight <= cs.state.LastBlockHeight {
|
||||
cs.Logger.Debug(
|
||||
cs.Logger.Info(
|
||||
"ignoring updateToState()",
|
||||
"new_height", state.LastBlockHeight+1,
|
||||
"old_height", cs.state.LastBlockHeight+1,
|
||||
@@ -2275,11 +2280,10 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: pass pubKey to signVote
|
||||
vote, err := cs.signVote(msgType, hash, header)
|
||||
if err == nil {
|
||||
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
|
||||
cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
|
||||
cs.Logger.Info("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
|
||||
return vote
|
||||
}
|
||||
|
||||
|
||||
@@ -121,7 +121,7 @@ func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerID p2p.ID) (added bool,
|
||||
return
|
||||
}
|
||||
voteSet := hvs.getVoteSet(vote.Round, vote.Type)
|
||||
if voteSet == nil {
|
||||
if voteSet.IsEmpty() {
|
||||
if rndz := hvs.peerCatchupRounds[peerID]; len(rndz) < 2 {
|
||||
hvs.addRound(vote.Round)
|
||||
voteSet = hvs.getVoteSet(vote.Round, vote.Type)
|
||||
@@ -166,7 +166,7 @@ func (hvs *HeightVoteSet) POLInfo() (polRound int32, polBlockID types.BlockID) {
|
||||
func (hvs *HeightVoteSet) getVoteSet(round int32, voteType tmproto.SignedMsgType) *types.VoteSet {
|
||||
rvs, ok := hvs.roundVoteSets[round]
|
||||
if !ok {
|
||||
return nil
|
||||
return &types.VoteSet{}
|
||||
}
|
||||
switch voteType {
|
||||
case tmproto.PrevoteType:
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto/tmhash"
|
||||
"github.com/tendermint/tendermint/internal/test"
|
||||
@@ -30,30 +31,23 @@ func TestPeerCatchupRounds(t *testing.T) {
|
||||
|
||||
vote999_0 := makeVoteHR(t, 1, 0, 999, privVals)
|
||||
added, err := hvs.AddVote(vote999_0, "peer1")
|
||||
if !added || err != nil {
|
||||
t.Error("Expected to successfully add vote from peer", added, err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
|
||||
vote1000_0 := makeVoteHR(t, 1, 0, 1000, privVals)
|
||||
added, err = hvs.AddVote(vote1000_0, "peer1")
|
||||
if !added || err != nil {
|
||||
t.Error("Expected to successfully add vote from peer", added, err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
|
||||
vote1001_0 := makeVoteHR(t, 1, 0, 1001, privVals)
|
||||
added, err = hvs.AddVote(vote1001_0, "peer1")
|
||||
if err != ErrGotVoteFromUnwantedRound {
|
||||
t.Errorf("expected GotVoteFromUnwantedRoundError, but got %v", err)
|
||||
}
|
||||
if added {
|
||||
t.Error("Expected to *not* add vote from peer, too many catchup rounds.")
|
||||
}
|
||||
require.Error(t, err)
|
||||
require.Equal(t, ErrGotVoteFromUnwantedRound, err)
|
||||
require.False(t, added)
|
||||
|
||||
added, err = hvs.AddVote(vote1001_0, "peer2")
|
||||
if !added || err != nil {
|
||||
t.Error("Expected to successfully add vote from another peer")
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
}
|
||||
|
||||
func makeVoteHR(t *testing.T, height int64, valIndex, round int32, privVals []types.PrivValidator) *types.Vote {
|
||||
|
||||
@@ -86,7 +86,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
|
||||
mempool := emptyMempool{}
|
||||
evpool := sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
|
||||
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
|
||||
consensusState := NewState(config.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy()))
|
||||
consensusState.SetLogger(logger)
|
||||
consensusState.SetEventBus(eventBus)
|
||||
if privValidator != nil {
|
||||
|
||||
266
node/node.go
266
node/node.go
@@ -11,7 +11,7 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/rs/cors"
|
||||
|
||||
bc "github.com/tendermint/tendermint/blocksync"
|
||||
"github.com/tendermint/tendermint/blocksync"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
cs "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/evidence"
|
||||
@@ -50,7 +50,6 @@ type Node struct {
|
||||
privValidator types.PrivValidator // local node's validator key
|
||||
|
||||
// network
|
||||
transport *p2p.MultiplexTransport
|
||||
sw *p2p.Switch // p2p connections
|
||||
addrBook pex.AddrBook // known peers
|
||||
nodeInfo p2p.NodeInfo
|
||||
@@ -60,11 +59,10 @@ type Node struct {
|
||||
// services
|
||||
eventBus *types.EventBus // pub/sub for services
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore // store the blockchain to disk
|
||||
bcReactor p2p.Reactor // for block-syncing
|
||||
mempoolReactor p2p.Reactor // for gossipping transactions
|
||||
blockStore *store.BlockStore // store the blockchain to disk
|
||||
bcReactor *blocksync.Reactor // for block-syncing
|
||||
mempoolReactor p2p.Reactor // for gossipping transactions
|
||||
mempool mempl.Mempool
|
||||
stateSync bool // whether the node should state sync on startup
|
||||
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
|
||||
stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node
|
||||
stateSyncGenesis sm.State // provides the genesis state for state sync
|
||||
@@ -79,6 +77,7 @@ type Node struct {
|
||||
indexerService *txindex.IndexerService
|
||||
prometheusSrv *http.Server
|
||||
pprofSrv *http.Server
|
||||
customReactors map[string]p2p.Reactor
|
||||
}
|
||||
|
||||
// Option sets a parameter for the node.
|
||||
@@ -98,29 +97,7 @@ type Option func(*Node)
|
||||
// - STATESYNC
|
||||
func CustomReactors(reactors map[string]p2p.Reactor) Option {
|
||||
return func(n *Node) {
|
||||
for name, reactor := range reactors {
|
||||
if existingReactor := n.sw.Reactor(name); existingReactor != nil {
|
||||
n.sw.Logger.Info("Replacing existing reactor with a custom one",
|
||||
"name", name, "existing", existingReactor, "custom", reactor)
|
||||
n.sw.RemoveReactor(name, existingReactor)
|
||||
}
|
||||
n.sw.AddReactor(name, reactor)
|
||||
// register the new channels to the nodeInfo
|
||||
// NOTE: This is a bit messy now with the type casting but is
|
||||
// cleaned up in the following version when NodeInfo is changed from
|
||||
// and interface to a concrete type
|
||||
if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok {
|
||||
for _, chDesc := range reactor.GetChannels() {
|
||||
if !ni.HasChannel(chDesc.ID) {
|
||||
ni.Channels = append(ni.Channels, chDesc.ID)
|
||||
n.transport.AddChannel(chDesc.ID)
|
||||
}
|
||||
}
|
||||
n.nodeInfo = ni
|
||||
} else {
|
||||
n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.")
|
||||
}
|
||||
}
|
||||
n.customReactors = reactors
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,74 +137,26 @@ func NewNode(config *cfg.Config,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID)
|
||||
csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics, bsMetrics, ssMetrics := metricsProvider(genDoc.ChainID)
|
||||
|
||||
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
|
||||
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics)
|
||||
proxyApp, err := createProxyAppConns(clientCreator, logger, abciMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
|
||||
}
|
||||
|
||||
// EventBus and IndexerService must be started before the handshake because
|
||||
// we might need to index the txs of the replayed block as this might not have happened
|
||||
// when the node stopped last time (i.e. the node stopped after it saved the block
|
||||
// but before it indexed the txs, or, endblocker panicked)
|
||||
eventBus, err := createAndStartEventBus(logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventBus := createEventBus(logger)
|
||||
|
||||
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config,
|
||||
indexerService, txIndexer, blockIndexer, err := createIndexerService(config,
|
||||
genDoc.ChainID, dbProvider, eventBus, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If an address is provided, listen on the socket for a connection from an
|
||||
// external signing process.
|
||||
if config.PrivValidatorListenAddr != "" {
|
||||
// FIXME: we should start services inside OnStart
|
||||
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error with private validator socket client: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
pubKey, err := privValidator.GetPubKey()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
|
||||
// Determine whether we should attempt state sync.
|
||||
stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
|
||||
if stateSync && state.LastBlockHeight > 0 {
|
||||
logger.Info("Found local state with non-zero height, skipping state sync")
|
||||
stateSync = false
|
||||
}
|
||||
|
||||
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
|
||||
// and replays any blocks as necessary to sync tendermint with the app.
|
||||
consensusLogger := logger.With("module", "consensus")
|
||||
if !stateSync {
|
||||
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reload the state. It will have the Version.Consensus.App set by the
|
||||
// Handshake, and may have other modifications as well (ie. depending on
|
||||
// what happened during block replay).
|
||||
state, err = stateStore.Load()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load state: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Determine whether we should do block sync. This must happen after the handshake, since the
|
||||
// app may modify the validator set, specifying ourself as the only validator.
|
||||
blockSync := config.BlockSyncMode && !onlyValidatorIsUs(state, pubKey)
|
||||
|
||||
logNodeStartupInfo(state, pubKey, logger, consensusLogger)
|
||||
|
||||
// Make MempoolReactor
|
||||
mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)
|
||||
|
||||
@@ -249,21 +178,16 @@ func NewNode(config *cfg.Config,
|
||||
)
|
||||
|
||||
// Make BlocksyncReactor. Don't start block sync if we're doing a state sync first.
|
||||
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
|
||||
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, bsMetrics, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
|
||||
}
|
||||
|
||||
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
|
||||
// FIXME We need to update metrics here, since other reactors don't have access to them.
|
||||
if stateSync {
|
||||
csMetrics.StateSyncing.Set(1)
|
||||
} else if blockSync {
|
||||
csMetrics.BlockSyncing.Set(1)
|
||||
}
|
||||
consensusLogger := logger.With("module", "consensus")
|
||||
consensusReactor, consensusState := createConsensusReactor(
|
||||
config, state, blockExec, blockStore, mempool, evidencePool,
|
||||
privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger,
|
||||
config, blockExec, blockStore, mempool, evidencePool,
|
||||
privValidator, csMetrics, eventBus, consensusLogger,
|
||||
)
|
||||
|
||||
// Set up state sync reactor, and schedule a sync if requested.
|
||||
@@ -275,22 +199,15 @@ func NewNode(config *cfg.Config,
|
||||
proxyApp.Snapshot(),
|
||||
proxyApp.Query(),
|
||||
config.StateSync.TempDir,
|
||||
ssMetrics,
|
||||
)
|
||||
stateSyncReactor.SetLogger(logger.With("module", "statesync"))
|
||||
|
||||
nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Setup Transport.
|
||||
transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp)
|
||||
|
||||
// Setup Switch.
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
sw := createSwitch(
|
||||
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
|
||||
stateSyncReactor, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
|
||||
config, p2pMetrics, mempoolReactor, bcReactor,
|
||||
stateSyncReactor, consensusReactor, evidenceReactor, p2pLogger,
|
||||
)
|
||||
|
||||
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
|
||||
@@ -303,7 +220,7 @@ func NewNode(config *cfg.Config,
|
||||
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
}
|
||||
|
||||
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
|
||||
addrBook, err := createAddrBook(config, p2pLogger, nodeKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create addrbook: %w", err)
|
||||
}
|
||||
@@ -322,7 +239,8 @@ func NewNode(config *cfg.Config,
|
||||
// Note we currently use the addrBook regardless at least for AddOurAddress
|
||||
var pexReactor *pex.Reactor
|
||||
if config.P2P.PexReactor {
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
|
||||
pexReactor = createPEXReactor(addrBook, config, logger)
|
||||
sw.AddReactor("PEX", pexReactor)
|
||||
}
|
||||
|
||||
// Add private IDs to addrbook to block those peers being added
|
||||
@@ -333,11 +251,9 @@ func NewNode(config *cfg.Config,
|
||||
genesisDoc: genDoc,
|
||||
privValidator: privValidator,
|
||||
|
||||
transport: transport,
|
||||
sw: sw,
|
||||
addrBook: addrBook,
|
||||
nodeInfo: nodeInfo,
|
||||
nodeKey: nodeKey,
|
||||
sw: sw,
|
||||
addrBook: addrBook,
|
||||
nodeKey: nodeKey,
|
||||
|
||||
stateStore: stateStore,
|
||||
blockStore: blockStore,
|
||||
@@ -347,7 +263,6 @@ func NewNode(config *cfg.Config,
|
||||
consensusState: consensusState,
|
||||
consensusReactor: consensusReactor,
|
||||
stateSyncReactor: stateSyncReactor,
|
||||
stateSync: stateSync,
|
||||
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
|
||||
pexReactor: pexReactor,
|
||||
evidencePool: evidencePool,
|
||||
@@ -363,6 +278,15 @@ func NewNode(config *cfg.Config,
|
||||
option(node)
|
||||
}
|
||||
|
||||
for name, reactor := range node.customReactors {
|
||||
if existingReactor := node.sw.Reactor(name); existingReactor != nil {
|
||||
node.sw.Logger.Info("Replacing existing reactor with a custom one",
|
||||
"name", name, "existing", existingReactor, "custom", reactor)
|
||||
node.sw.RemoveReactor(name, existingReactor)
|
||||
}
|
||||
node.sw.AddReactor(name, reactor)
|
||||
}
|
||||
|
||||
return node, nil
|
||||
}
|
||||
|
||||
@@ -375,6 +299,77 @@ func (n *Node) OnStart() error {
|
||||
time.Sleep(genTime.Sub(now))
|
||||
}
|
||||
|
||||
// If an address is provided, listen on the socket for a connection from an
|
||||
// external signing process. This will overwrite the privvalidator provided in the constructor
|
||||
if n.config.PrivValidatorListenAddr != "" {
|
||||
var err error
|
||||
n.privValidator, err = createPrivValidatorSocketClient(n.config.PrivValidatorListenAddr, n.genesisDoc.ChainID, n.Logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error with private validator socket client: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
pubKey, err := n.privValidator.GetPubKey()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
|
||||
state, err := n.stateStore.LoadFromDBOrGenesisDoc(n.genesisDoc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot load state: %w", err)
|
||||
}
|
||||
|
||||
// Determine whether we should attempt state sync.
|
||||
stateSync := n.config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
|
||||
if stateSync && state.LastBlockHeight > 0 {
|
||||
n.Logger.Info("Found local state with non-zero height, skipping state sync")
|
||||
stateSync = false
|
||||
}
|
||||
|
||||
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
|
||||
// and replays any blocks as necessary to sync tendermint with the app.
|
||||
|
||||
if !stateSync {
|
||||
if err := doHandshake(n.stateStore, state, n.blockStore, n.genesisDoc, n.eventBus, n.proxyApp, n.Logger); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Reload the state. It will have the Version.Consensus.App set by the
|
||||
// Handshake, and may have other modifications as well (ie. depending on
|
||||
// what happened during block replay).
|
||||
state, err = n.stateStore.Load()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot load state: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
nodeInfo, err := makeNodeInfo(n.config, n.nodeKey, n.txIndexer, n.genesisDoc, state)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, reactor := range n.customReactors {
|
||||
for _, chDesc := range reactor.GetChannels() {
|
||||
if !nodeInfo.HasChannel(chDesc.ID) {
|
||||
nodeInfo.Channels = append(nodeInfo.Channels, chDesc.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
n.nodeInfo = nodeInfo
|
||||
|
||||
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup Transport.
|
||||
transport, peerFilters := createTransport(n.config, n.nodeInfo, n.nodeKey, addr, n.proxyApp)
|
||||
|
||||
n.sw.SetTransport(transport)
|
||||
n.sw.SetPeerFilters(peerFilters...)
|
||||
n.sw.SetNodeInfo(n.nodeInfo)
|
||||
n.sw.SetNodeKey(n.nodeKey)
|
||||
|
||||
// run pprof server if it is enabled
|
||||
if n.config.RPC.IsPprofEnabled() {
|
||||
n.pprofSrv = n.startPprofServer()
|
||||
@@ -395,16 +390,13 @@ func (n *Node) OnStart() error {
|
||||
n.rpcListeners = listeners
|
||||
}
|
||||
|
||||
// Start the transport.
|
||||
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := n.transport.Listen(*addr); err != nil {
|
||||
if err := n.eventBus.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n.isListening = true
|
||||
if err := n.indexerService.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the switch (the P2P server).
|
||||
err = n.sw.Start()
|
||||
@@ -412,23 +404,37 @@ func (n *Node) OnStart() error {
|
||||
return err
|
||||
}
|
||||
|
||||
n.isListening = true
|
||||
|
||||
// Always connect to persistent peers
|
||||
err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not dial peers from persistent_peers field: %w", err)
|
||||
}
|
||||
|
||||
// Run state sync
|
||||
if n.stateSync {
|
||||
bcR, ok := n.bcReactor.(blockSyncReactor)
|
||||
if !ok {
|
||||
return fmt.Errorf("this blocksync reactor does not support switching from state sync")
|
||||
}
|
||||
err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
|
||||
// Determine whether we should do block sync. This must happen after the handshake, since the
|
||||
// app may modify the validator set, specifying ourself as the only validator.
|
||||
blockSync := n.config.BlockSyncMode && !onlyValidatorIsUs(state, pubKey)
|
||||
|
||||
logNodeStartupInfo(state, pubKey, n.Logger)
|
||||
|
||||
// Run start up phases
|
||||
if stateSync {
|
||||
err := startStateSync(n.stateSyncReactor, n.bcReactor, n.consensusReactor, n.stateSyncProvider,
|
||||
n.config.StateSync, n.config.BlockSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start state sync: %w", err)
|
||||
}
|
||||
} else if blockSync {
|
||||
err := n.bcReactor.SwitchToBlockSync(state)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start block sync: %w", err)
|
||||
}
|
||||
} else {
|
||||
err := n.consensusReactor.SwitchToConsensus(state, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to switch to consensus: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -453,10 +459,6 @@ func (n *Node) OnStop() {
|
||||
n.Logger.Error("Error closing switch", "err", err)
|
||||
}
|
||||
|
||||
if err := n.transport.Close(); err != nil {
|
||||
n.Logger.Error("Error closing transport", "err", err)
|
||||
}
|
||||
|
||||
n.isListening = false
|
||||
|
||||
// finally stop the listeners / external services
|
||||
@@ -518,6 +520,8 @@ func (n *Node) ConfigureRPC() error {
|
||||
TxIndexer: n.txIndexer,
|
||||
BlockIndexer: n.blockIndexer,
|
||||
ConsensusReactor: n.consensusReactor,
|
||||
BlocksyncReactor: n.bcReactor,
|
||||
StatesyncReactor: n.stateSyncReactor,
|
||||
EventBus: n.eventBus,
|
||||
Mempool: n.mempool,
|
||||
|
||||
@@ -651,7 +655,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
|
||||
}
|
||||
|
||||
// startPrometheusServer starts a Prometheus HTTP server, listening for metrics
|
||||
// collectors on addr.
|
||||
// collectors on the provided address.
|
||||
func (n *Node) startPrometheusServer() *http.Server {
|
||||
srv := &http.Server{
|
||||
Addr: n.config.Instrumentation.PrometheusListenAddr,
|
||||
@@ -672,7 +676,7 @@ func (n *Node) startPrometheusServer() *http.Server {
|
||||
return srv
|
||||
}
|
||||
|
||||
// starts a ppro
|
||||
// starts a pprof server at the specified listen address
|
||||
func (n *Node) startPprofServer() *http.Server {
|
||||
srv := &http.Server{
|
||||
Addr: n.config.RPC.PprofListenAddress,
|
||||
@@ -682,7 +686,7 @@ func (n *Node) startPprofServer() *http.Server {
|
||||
go func() {
|
||||
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
|
||||
// Error starting or closing listener:
|
||||
n.Logger.Error("pprof HTTP server ListenAndServe", "err", err)
|
||||
n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
|
||||
}
|
||||
}()
|
||||
return srv
|
||||
@@ -793,7 +797,7 @@ func makeNodeInfo(
|
||||
Network: genDoc.ChainID,
|
||||
Version: version.TMCoreSemVer,
|
||||
Channels: []byte{
|
||||
bc.BlocksyncChannel,
|
||||
blocksync.BlocksyncChannel,
|
||||
cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
|
||||
mempl.MempoolChannel,
|
||||
evidence.EvidenceChannel,
|
||||
|
||||
@@ -123,6 +123,8 @@ func TestNodeSetAppVersion(t *testing.T) {
|
||||
// create & start node
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, n.Start())
|
||||
defer n.Stop() //nolint:errcheck
|
||||
|
||||
// default config uses the kvstore app
|
||||
var appVersion = kvstore.ProtocolVersion
|
||||
@@ -189,6 +191,8 @@ func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, n.Start())
|
||||
defer n.Stop() //nolint:errcheck
|
||||
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
@@ -201,7 +205,7 @@ func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
|
||||
config.BaseConfig.PrivValidatorListenAddr = addrNoPrefix
|
||||
|
||||
_, err := DefaultNewNode(config, log.TestingLogger())
|
||||
assert.Error(t, err)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
@@ -451,7 +455,6 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
|
||||
RecvMessageCapacity: 100,
|
||||
},
|
||||
}
|
||||
customBlocksyncReactor := p2pmock.NewReactor()
|
||||
|
||||
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
|
||||
require.NoError(t, err)
|
||||
@@ -464,7 +467,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
|
||||
DefaultDBProvider,
|
||||
DefaultMetricsProvider(config.Instrumentation),
|
||||
log.TestingLogger(),
|
||||
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKSYNC": customBlocksyncReactor}),
|
||||
CustomReactors(map[string]p2p.Reactor{"FOO": cr}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -475,9 +478,6 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
|
||||
assert.True(t, cr.IsRunning())
|
||||
assert.Equal(t, cr, n.Switch().Reactor("FOO"))
|
||||
|
||||
assert.True(t, customBlocksyncReactor.IsRunning())
|
||||
assert.Equal(t, customBlocksyncReactor, n.Switch().Reactor("BLOCKSYNC"))
|
||||
|
||||
channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
|
||||
assert.Contains(t, channels, mempl.MempoolChannel)
|
||||
assert.Contains(t, channels, cr.Channels[0].ID)
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
bc "github.com/tendermint/tendermint/blocksync"
|
||||
"github.com/tendermint/tendermint/blocksync"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
cs "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
@@ -99,20 +99,22 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
|
||||
}
|
||||
|
||||
// MetricsProvider returns a consensus, p2p and mempool Metrics.
|
||||
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
|
||||
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics)
|
||||
|
||||
// DefaultMetricsProvider returns Metrics build using Prometheus client library
|
||||
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
|
||||
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
|
||||
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
|
||||
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) {
|
||||
if config.Prometheus {
|
||||
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID)
|
||||
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
blocksync.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID)
|
||||
}
|
||||
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
|
||||
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics(), blocksync.NopMetrics(), statesync.NopMetrics()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,7 +140,7 @@ func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.Block
|
||||
return
|
||||
}
|
||||
|
||||
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
|
||||
func createProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
|
||||
proxyApp := proxy.NewAppConns(clientCreator, metrics)
|
||||
proxyApp.SetLogger(logger.With("module", "proxy"))
|
||||
if err := proxyApp.Start(); err != nil {
|
||||
@@ -147,16 +149,13 @@ func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.L
|
||||
return proxyApp, nil
|
||||
}
|
||||
|
||||
func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
|
||||
func createEventBus(logger log.Logger) *types.EventBus {
|
||||
eventBus := types.NewEventBus()
|
||||
eventBus.SetLogger(logger.With("module", "events"))
|
||||
if err := eventBus.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return eventBus, nil
|
||||
return eventBus
|
||||
}
|
||||
|
||||
func createAndStartIndexerService(
|
||||
func createIndexerService(
|
||||
config *cfg.Config,
|
||||
chainID string,
|
||||
dbProvider DBProvider,
|
||||
@@ -197,10 +196,6 @@ func createAndStartIndexerService(
|
||||
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
|
||||
indexerService.SetLogger(logger.With("module", "txindex"))
|
||||
|
||||
if err := indexerService.Start(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return indexerService, txIndexer, blockIndexer, nil
|
||||
}
|
||||
|
||||
@@ -222,7 +217,7 @@ func doHandshake(
|
||||
return nil
|
||||
}
|
||||
|
||||
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
|
||||
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger) {
|
||||
// Log the version info.
|
||||
logger.Info("Version info",
|
||||
"tendermint_version", version.TMCoreSemVer,
|
||||
@@ -243,9 +238,9 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
|
||||
addr := pubKey.Address()
|
||||
// Log whether this node is a validator or an observer
|
||||
if state.Validators.HasAddress(addr) {
|
||||
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
|
||||
logger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
|
||||
} else {
|
||||
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
|
||||
logger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -334,12 +329,12 @@ func createBlocksyncReactor(config *cfg.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore *store.BlockStore,
|
||||
blockSync bool,
|
||||
metrics *blocksync.Metrics,
|
||||
logger log.Logger,
|
||||
) (bcReactor p2p.Reactor, err error) {
|
||||
) (bcReactor *blocksync.Reactor, err error) {
|
||||
switch config.BlockSync.Version {
|
||||
case "v0":
|
||||
bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync)
|
||||
bcReactor = blocksync.NewReactor(state.Copy(), blockExec, blockStore, metrics)
|
||||
case "v1", "v2":
|
||||
return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version)
|
||||
default:
|
||||
@@ -351,31 +346,28 @@ func createBlocksyncReactor(config *cfg.Config,
|
||||
}
|
||||
|
||||
func createConsensusReactor(config *cfg.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore sm.BlockStore,
|
||||
mempool mempl.Mempool,
|
||||
evidencePool *evidence.Pool,
|
||||
privValidator types.PrivValidator,
|
||||
csMetrics *cs.Metrics,
|
||||
waitSync bool,
|
||||
eventBus *types.EventBus,
|
||||
consensusLogger log.Logger,
|
||||
) (*cs.Reactor, *cs.State) {
|
||||
consensusState := cs.NewState(
|
||||
config.Consensus,
|
||||
state.Copy(),
|
||||
blockExec,
|
||||
blockStore,
|
||||
mempool,
|
||||
evidencePool,
|
||||
cs.StateMetrics(csMetrics),
|
||||
cs.WithMetrics(csMetrics),
|
||||
)
|
||||
consensusState.SetLogger(consensusLogger)
|
||||
if privValidator != nil {
|
||||
consensusState.SetPrivValidator(privValidator)
|
||||
}
|
||||
consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics))
|
||||
consensusReactor := cs.NewReactor(consensusState, cs.ReactorMetrics(csMetrics))
|
||||
consensusReactor.SetLogger(consensusLogger)
|
||||
// services which will be publishing and/or subscribing for messages (events)
|
||||
// consensusReactor will set it on consensusState and blockExecutor
|
||||
@@ -387,6 +379,7 @@ func createTransport(
|
||||
config *cfg.Config,
|
||||
nodeInfo p2p.NodeInfo,
|
||||
nodeKey *p2p.NodeKey,
|
||||
netAddr *p2p.NetAddress,
|
||||
proxyApp proxy.AppConns,
|
||||
) (
|
||||
*p2p.MultiplexTransport,
|
||||
@@ -394,7 +387,7 @@ func createTransport(
|
||||
) {
|
||||
var (
|
||||
mConnConfig = p2p.MConnConfig(config.P2P)
|
||||
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
|
||||
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, *netAddr, mConnConfig)
|
||||
connFilters = []p2p.ConnFilterFunc{}
|
||||
peerFilters = []p2p.PeerFilterFunc{}
|
||||
)
|
||||
@@ -453,23 +446,17 @@ func createTransport(
|
||||
}
|
||||
|
||||
func createSwitch(config *cfg.Config,
|
||||
transport p2p.Transport,
|
||||
p2pMetrics *p2p.Metrics,
|
||||
peerFilters []p2p.PeerFilterFunc,
|
||||
mempoolReactor p2p.Reactor,
|
||||
bcReactor p2p.Reactor,
|
||||
stateSyncReactor *statesync.Reactor,
|
||||
consensusReactor *cs.Reactor,
|
||||
evidenceReactor *evidence.Reactor,
|
||||
nodeInfo p2p.NodeInfo,
|
||||
nodeKey *p2p.NodeKey,
|
||||
p2pLogger log.Logger,
|
||||
) *p2p.Switch {
|
||||
sw := p2p.NewSwitch(
|
||||
config.P2P,
|
||||
transport,
|
||||
p2p.WithMetrics(p2pMetrics),
|
||||
p2p.SwitchPeerFilters(peerFilters...),
|
||||
)
|
||||
sw.SetLogger(p2pLogger)
|
||||
sw.AddReactor("MEMPOOL", mempoolReactor)
|
||||
@@ -477,15 +464,10 @@ func createSwitch(config *cfg.Config,
|
||||
sw.AddReactor("CONSENSUS", consensusReactor)
|
||||
sw.AddReactor("EVIDENCE", evidenceReactor)
|
||||
sw.AddReactor("STATESYNC", stateSyncReactor)
|
||||
|
||||
sw.SetNodeInfo(nodeInfo)
|
||||
sw.SetNodeKey(nodeKey)
|
||||
|
||||
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
|
||||
return sw
|
||||
}
|
||||
|
||||
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
|
||||
func createAddrBook(config *cfg.Config,
|
||||
p2pLogger log.Logger, nodeKey *p2p.NodeKey,
|
||||
) (pex.AddrBook, error) {
|
||||
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
|
||||
@@ -506,15 +488,10 @@ func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
|
||||
}
|
||||
addrBook.AddOurAddress(addr)
|
||||
}
|
||||
|
||||
sw.SetAddrBook(addrBook)
|
||||
|
||||
return addrBook, nil
|
||||
}
|
||||
|
||||
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
|
||||
sw *p2p.Switch, logger log.Logger,
|
||||
) *pex.Reactor {
|
||||
func createPEXReactor(addrBook pex.AddrBook, config *cfg.Config, logger log.Logger) *pex.Reactor {
|
||||
// TODO persistent peers ? so we can have their DNS addrs saved
|
||||
pexReactor := pex.NewReactor(addrBook,
|
||||
&pex.ReactorConfig{
|
||||
@@ -529,7 +506,6 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
|
||||
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
|
||||
})
|
||||
pexReactor.SetLogger(logger.With("module", "pex"))
|
||||
sw.AddReactor("PEX", pexReactor)
|
||||
return pexReactor
|
||||
}
|
||||
|
||||
@@ -575,16 +551,17 @@ func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.React
|
||||
}
|
||||
|
||||
if blockSync {
|
||||
// FIXME Very ugly to have these metrics bleed through here.
|
||||
conR.Metrics.StateSyncing.Set(0)
|
||||
conR.Metrics.BlockSyncing.Set(1)
|
||||
err = bcR.SwitchToBlockSync(state)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("Failed to switch to block sync", "err", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
conR.SwitchToConsensus(state, true)
|
||||
err := conR.SwitchToConsensus(state, true)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("Failed to switch to consensus", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
@@ -659,7 +636,7 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func createAndStartPrivValidatorSocketClient(
|
||||
func createPrivValidatorSocketClient(
|
||||
listenAddr,
|
||||
chainID string,
|
||||
logger log.Logger,
|
||||
@@ -674,12 +651,6 @@ func createAndStartPrivValidatorSocketClient(
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
|
||||
// try to get a pubkey from private validate first time
|
||||
_, err = pvsc.GetPubKey()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
|
||||
const (
|
||||
retries = 50 // 50 * 100ms = 5s total
|
||||
timeout = 100 * time.Millisecond
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
@@ -107,7 +108,6 @@ type SwitchOption func(*Switch)
|
||||
// NewSwitch creates a new Switch with the given config.
|
||||
func NewSwitch(
|
||||
cfg *config.P2PConfig,
|
||||
transport Transport,
|
||||
options ...SwitchOption,
|
||||
) *Switch {
|
||||
|
||||
@@ -121,7 +121,6 @@ func NewSwitch(
|
||||
dialing: cmap.NewCMap(),
|
||||
reconnecting: cmap.NewCMap(),
|
||||
metrics: NopMetrics(),
|
||||
transport: transport,
|
||||
filterTimeout: defaultFilterTimeout,
|
||||
persistentPeersAddrs: make([]*NetAddress, 0),
|
||||
unconditionalPeerIDs: make(map[ID]struct{}),
|
||||
@@ -224,11 +223,30 @@ func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
|
||||
sw.nodeKey = nodeKey
|
||||
}
|
||||
|
||||
func (sw *Switch) SetPeerFilters(filters ...PeerFilterFunc) {
|
||||
sw.peerFilters = filters
|
||||
}
|
||||
|
||||
func (sw *Switch) SetTransport(transport Transport) {
|
||||
if sw.IsRunning() {
|
||||
panic("cannot set transport while switch is running")
|
||||
}
|
||||
sw.transport = transport
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------
|
||||
// Service start/stop
|
||||
|
||||
// OnStart implements BaseService. It starts all the reactors and peers.
|
||||
func (sw *Switch) OnStart() error {
|
||||
if sw.transport == nil {
|
||||
return errors.New("transport not set")
|
||||
}
|
||||
|
||||
if err := sw.transport.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start reactors
|
||||
for _, reactor := range sw.reactors {
|
||||
err := reactor.Start()
|
||||
@@ -257,6 +275,10 @@ func (sw *Switch) OnStop() {
|
||||
sw.Logger.Error("error while stopped reactor", "reactor", reactor, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := sw.transport.Stop(); err != nil {
|
||||
sw.Logger.Error("closing transport", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
|
||||
@@ -695,9 +696,16 @@ func TestSwitchAcceptRoutine(t *testing.T) {
|
||||
}
|
||||
|
||||
type errorTransport struct {
|
||||
service.BaseService
|
||||
acceptErr error
|
||||
}
|
||||
|
||||
func newErrTransport(acceptErr error) *errorTransport {
|
||||
t := &errorTransport{acceptErr: acceptErr}
|
||||
t.BaseService = *service.NewBaseService(nil, "Error Transport", t)
|
||||
return t
|
||||
}
|
||||
|
||||
func (et errorTransport) NetAddress() NetAddress {
|
||||
panic("not implemented")
|
||||
}
|
||||
@@ -713,7 +721,9 @@ func (errorTransport) Cleanup(Peer) {
|
||||
}
|
||||
|
||||
func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
|
||||
sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}})
|
||||
sw := NewSwitch(cfg)
|
||||
sw.SetTransport(newErrTransport(ErrFilterTimeout{}))
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
err := sw.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -721,7 +731,8 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
sw = NewSwitch(cfg, errorTransport{ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}})
|
||||
sw = NewSwitch(cfg)
|
||||
sw.SetTransport(newErrTransport(ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}))
|
||||
assert.NotPanics(t, func() {
|
||||
err := sw.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -730,7 +741,8 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
|
||||
})
|
||||
// TODO(melekes) check we remove our address from addrBook
|
||||
|
||||
sw = NewSwitch(cfg, errorTransport{ErrTransportClosed{}})
|
||||
sw = NewSwitch(cfg)
|
||||
sw.SetTransport(newErrTransport(ErrTransportClosed{}))
|
||||
assert.NotPanics(t, func() {
|
||||
err := sw.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -194,14 +194,11 @@ func MakeSwitch(
|
||||
panic(err)
|
||||
}
|
||||
|
||||
t := NewMultiplexTransport(nodeInfo, nodeKey, MConnConfig(cfg))
|
||||
|
||||
if err := t.Listen(*addr); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
t := NewMultiplexTransport(nodeInfo, nodeKey, *addr, MConnConfig(cfg))
|
||||
|
||||
// TODO: let the config be passed in?
|
||||
sw := initSwitch(i, NewSwitch(cfg, t, opts...))
|
||||
sw := initSwitch(i, NewSwitch(cfg, opts...))
|
||||
sw.SetTransport(t)
|
||||
sw.SetLogger(log.TestingLogger().With("switch", i))
|
||||
sw.SetNodeKey(&nodeKey)
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/libs/protoio"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
|
||||
)
|
||||
@@ -59,6 +60,8 @@ type peerConfig struct {
|
||||
// the transport. Each transport is also responsible to filter establishing
|
||||
// peers specific to its domain.
|
||||
type Transport interface {
|
||||
service.Service
|
||||
|
||||
// Listening address.
|
||||
NetAddress() NetAddress
|
||||
|
||||
@@ -72,13 +75,6 @@ type Transport interface {
|
||||
Cleanup(Peer)
|
||||
}
|
||||
|
||||
// transportLifecycle bundles the methods for callers to control start and stop
|
||||
// behavior.
|
||||
type transportLifecycle interface {
|
||||
Close() error
|
||||
Listen(NetAddress) error
|
||||
}
|
||||
|
||||
// ConnFilterFunc to be implemented by filter hooks after a new connection has
|
||||
// been established. The set of exisiting connections is passed along together
|
||||
// with all resolved IPs for the new connection.
|
||||
@@ -136,6 +132,8 @@ func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
|
||||
// MultiplexTransport accepts and dials tcp connections and upgrades them to
|
||||
// multiplexed peers.
|
||||
type MultiplexTransport struct {
|
||||
service.BaseService
|
||||
|
||||
netAddr NetAddress
|
||||
listener net.Listener
|
||||
maxIncomingConnections int // see MaxIncomingConnections
|
||||
@@ -162,15 +160,15 @@ type MultiplexTransport struct {
|
||||
|
||||
// Test multiplexTransport for interface completeness.
|
||||
var _ Transport = (*MultiplexTransport)(nil)
|
||||
var _ transportLifecycle = (*MultiplexTransport)(nil)
|
||||
|
||||
// NewMultiplexTransport returns a tcp connected multiplexed peer.
|
||||
func NewMultiplexTransport(
|
||||
nodeInfo NodeInfo,
|
||||
nodeKey NodeKey,
|
||||
netAddr NetAddress,
|
||||
mConfig conn.MConnConfig,
|
||||
) *MultiplexTransport {
|
||||
return &MultiplexTransport{
|
||||
t := &MultiplexTransport{
|
||||
acceptc: make(chan accept),
|
||||
closec: make(chan struct{}),
|
||||
dialTimeout: defaultDialTimeout,
|
||||
@@ -179,9 +177,12 @@ func NewMultiplexTransport(
|
||||
mConfig: mConfig,
|
||||
nodeInfo: nodeInfo,
|
||||
nodeKey: nodeKey,
|
||||
netAddr: netAddr,
|
||||
conns: NewConnSet(),
|
||||
resolver: net.DefaultResolver,
|
||||
}
|
||||
t.BaseService = *service.NewBaseService(nil, "P2P Transport", t)
|
||||
return t
|
||||
}
|
||||
|
||||
// NetAddress implements Transport.
|
||||
@@ -234,20 +235,20 @@ func (mt *MultiplexTransport) Dial(
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Close implements transportLifecycle.
|
||||
func (mt *MultiplexTransport) Close() error {
|
||||
// OnStop implements Service.
|
||||
func (mt *MultiplexTransport) OnStop() {
|
||||
close(mt.closec)
|
||||
|
||||
if mt.listener != nil {
|
||||
return mt.listener.Close()
|
||||
if err := mt.listener.Close(); err != nil {
|
||||
mt.Logger.Error("closing listener", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Listen implements transportLifecycle.
|
||||
func (mt *MultiplexTransport) Listen(addr NetAddress) error {
|
||||
ln, err := net.Listen("tcp", addr.DialString())
|
||||
// OnStart implements Service.
|
||||
func (mt *MultiplexTransport) OnStart() error {
|
||||
ln, err := net.Listen("tcp", mt.netAddr.DialString())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -256,7 +257,6 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
|
||||
ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
|
||||
}
|
||||
|
||||
mt.netAddr = addr
|
||||
mt.listener = ln
|
||||
|
||||
go mt.acceptPeers()
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/libs/protoio"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
@@ -28,20 +29,26 @@ func emptyNodeInfo() NodeInfo {
|
||||
func newMultiplexTransport(
|
||||
nodeInfo NodeInfo,
|
||||
nodeKey NodeKey,
|
||||
netAddr NetAddress,
|
||||
) *MultiplexTransport {
|
||||
return NewMultiplexTransport(
|
||||
nodeInfo, nodeKey, conn.DefaultMConnConfig(),
|
||||
nodeInfo, nodeKey, netAddr, conn.DefaultMConnConfig(),
|
||||
)
|
||||
}
|
||||
|
||||
func TestTransportMultiplexConnFilter(t *testing.T) {
|
||||
nodeKey := NodeKey{
|
||||
PrivKey: ed25519.GenPrivKey(),
|
||||
}
|
||||
id := nodeKey.ID()
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
mt := newMultiplexTransport(
|
||||
emptyNodeInfo(),
|
||||
NodeKey{
|
||||
PrivKey: ed25519.GenPrivKey(),
|
||||
},
|
||||
nodeKey,
|
||||
*addr,
|
||||
)
|
||||
id := mt.nodeKey.ID()
|
||||
|
||||
MultiplexTransportConnFilters(
|
||||
func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil },
|
||||
@@ -51,14 +58,8 @@ func TestTransportMultiplexConnFilter(t *testing.T) {
|
||||
},
|
||||
)(mt)
|
||||
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := mt.Listen(*addr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = mt.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
errc := make(chan error)
|
||||
|
||||
@@ -89,13 +90,18 @@ func TestTransportMultiplexConnFilter(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
|
||||
nodeKey := NodeKey{
|
||||
PrivKey: ed25519.GenPrivKey(),
|
||||
}
|
||||
id := nodeKey.ID()
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
mt := newMultiplexTransport(
|
||||
emptyNodeInfo(),
|
||||
NodeKey{
|
||||
PrivKey: ed25519.GenPrivKey(),
|
||||
},
|
||||
nodeKey,
|
||||
*addr,
|
||||
)
|
||||
id := mt.nodeKey.ID()
|
||||
|
||||
MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt)
|
||||
MultiplexTransportConnFilters(
|
||||
@@ -105,14 +111,9 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
|
||||
},
|
||||
)(mt)
|
||||
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := mt.Listen(*addr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = mt.Start()
|
||||
require.NoError(t, err)
|
||||
defer mt.Stop() //nolint:errcheck
|
||||
|
||||
errc := make(chan error)
|
||||
go func() {
|
||||
@@ -140,6 +141,10 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
|
||||
func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
|
||||
pv := ed25519.GenPrivKey()
|
||||
id := PubKeyToID(pv.PubKey())
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mt := newMultiplexTransport(
|
||||
testNodeInfo(
|
||||
id, "transport",
|
||||
@@ -147,26 +152,22 @@ func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
*addr,
|
||||
)
|
||||
|
||||
MultiplexTransportMaxIncomingConnections(0)(mt)
|
||||
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
const maxIncomingConns = 2
|
||||
MultiplexTransportMaxIncomingConnections(maxIncomingConns)(mt)
|
||||
if err := mt.Listen(*addr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = mt.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
|
||||
|
||||
// Connect more peers than max
|
||||
for i := 0; i <= maxIncomingConns; i++ {
|
||||
errc := make(chan error)
|
||||
go testDialer(*laddr, errc)
|
||||
go testDialer(t, *laddr, errc)
|
||||
|
||||
err = <-errc
|
||||
if i < maxIncomingConns {
|
||||
@@ -198,7 +199,7 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) {
|
||||
|
||||
// Setup dialers.
|
||||
for i := 0; i < nDialers; i++ {
|
||||
go testDialer(*laddr, errc)
|
||||
go testDialer(t, *laddr, errc)
|
||||
}
|
||||
|
||||
// Catch connection errors.
|
||||
@@ -235,23 +236,26 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := mt.Close(); err != nil {
|
||||
if err := mt.Stop(); err != nil {
|
||||
t.Errorf("close errored: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testDialer(dialAddr NetAddress, errc chan error) {
|
||||
var (
|
||||
pv = ed25519.GenPrivKey()
|
||||
dialer = newMultiplexTransport(
|
||||
testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName),
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
)
|
||||
func testDialer(t *testing.T, dialAddr NetAddress, errc chan error) {
|
||||
pv := ed25519.GenPrivKey()
|
||||
id := PubKeyToID(pv.PubKey())
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
dialer := newMultiplexTransport(
|
||||
testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName),
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
*addr,
|
||||
)
|
||||
|
||||
_, err := dialer.Dial(dialAddr, peerConfig{})
|
||||
_, err = dialer.Dial(dialAddr, peerConfig{})
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
@@ -319,15 +323,14 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) {
|
||||
go func() {
|
||||
<-slowc
|
||||
|
||||
var (
|
||||
dialer = newMultiplexTransport(
|
||||
fastNodeInfo,
|
||||
NodeKey{
|
||||
PrivKey: fastNodePV,
|
||||
},
|
||||
)
|
||||
)
|
||||
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
|
||||
dialer := newMultiplexTransport(
|
||||
fastNodeInfo,
|
||||
NodeKey{
|
||||
PrivKey: fastNodePV,
|
||||
},
|
||||
*addr,
|
||||
)
|
||||
|
||||
_, err := dialer.Dial(*addr, peerConfig{})
|
||||
if err != nil {
|
||||
@@ -360,17 +363,16 @@ func TestTransportMultiplexValidateNodeInfo(t *testing.T) {
|
||||
errc := make(chan error)
|
||||
|
||||
go func() {
|
||||
var (
|
||||
pv = ed25519.GenPrivKey()
|
||||
dialer = newMultiplexTransport(
|
||||
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
pv := ed25519.GenPrivKey()
|
||||
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
|
||||
dialer := newMultiplexTransport(
|
||||
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
*addr,
|
||||
)
|
||||
|
||||
_, err := dialer.Dial(*addr, peerConfig{})
|
||||
if err != nil {
|
||||
@@ -401,6 +403,7 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
|
||||
errc := make(chan error)
|
||||
|
||||
go func() {
|
||||
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
|
||||
dialer := newMultiplexTransport(
|
||||
testNodeInfo(
|
||||
PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer",
|
||||
@@ -408,8 +411,8 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
|
||||
NodeKey{
|
||||
PrivKey: ed25519.GenPrivKey(),
|
||||
},
|
||||
*addr,
|
||||
)
|
||||
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
|
||||
|
||||
_, err := dialer.Dial(*addr, peerConfig{})
|
||||
if err != nil {
|
||||
@@ -437,18 +440,16 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
|
||||
func TestTransportMultiplexDialRejectWrongID(t *testing.T) {
|
||||
mt := testSetupMultiplexTransport(t)
|
||||
|
||||
var (
|
||||
pv = ed25519.GenPrivKey()
|
||||
dialer = newMultiplexTransport(
|
||||
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
pv := ed25519.GenPrivKey()
|
||||
wrongID := PubKeyToID(ed25519.GenPrivKey().PubKey())
|
||||
addr := NewNetAddress(wrongID, mt.listener.Addr())
|
||||
dialer := newMultiplexTransport(
|
||||
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
*addr,
|
||||
)
|
||||
|
||||
_, err := dialer.Dial(*addr, peerConfig{})
|
||||
if err != nil {
|
||||
@@ -471,14 +472,15 @@ func TestTransportMultiplexRejectIncompatible(t *testing.T) {
|
||||
go func() {
|
||||
var (
|
||||
pv = ed25519.GenPrivKey()
|
||||
addr = NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
|
||||
dialer = newMultiplexTransport(
|
||||
testNodeInfoWithNetwork(PubKeyToID(pv.PubKey()), "dialer", "incompatible-network"),
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
*addr,
|
||||
)
|
||||
)
|
||||
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
|
||||
|
||||
_, err := dialer.Dial(*addr, peerConfig{})
|
||||
if err != nil {
|
||||
@@ -622,11 +624,16 @@ func TestTransportHandshake(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTransportAddChannel(t *testing.T) {
|
||||
pv := ed25519.GenPrivKey()
|
||||
id := PubKeyToID(pv.PubKey())
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
require.NoError(t, err)
|
||||
mt := newMultiplexTransport(
|
||||
emptyNodeInfo(),
|
||||
NodeKey{
|
||||
PrivKey: ed25519.GenPrivKey(),
|
||||
PrivKey: pv,
|
||||
},
|
||||
*addr,
|
||||
)
|
||||
testChannel := byte(0x01)
|
||||
|
||||
@@ -638,27 +645,27 @@ func TestTransportAddChannel(t *testing.T) {
|
||||
|
||||
// create listener
|
||||
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
|
||||
var (
|
||||
pv = ed25519.GenPrivKey()
|
||||
id = PubKeyToID(pv.PubKey())
|
||||
mt = newMultiplexTransport(
|
||||
testNodeInfo(
|
||||
id, "transport",
|
||||
),
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
pv := ed25519.GenPrivKey()
|
||||
id := PubKeyToID(pv.PubKey())
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mt := newMultiplexTransport(
|
||||
testNodeInfo(
|
||||
id, "transport",
|
||||
),
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
*addr,
|
||||
)
|
||||
|
||||
if err := mt.Listen(*addr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = mt.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
mt.Stop() //nolint:errcheck
|
||||
})
|
||||
|
||||
// give the listener some time to get ready
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/blocksync"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
@@ -16,6 +17,7 @@ import (
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/statesync"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -91,6 +93,8 @@ type Environment struct {
|
||||
TxIndexer txindex.TxIndexer
|
||||
BlockIndexer indexer.BlockIndexer
|
||||
ConsensusReactor *consensus.Reactor
|
||||
BlocksyncReactor *blocksync.Reactor
|
||||
StatesyncReactor *statesync.Reactor
|
||||
EventBus *types.EventBus // thread safe
|
||||
Mempool mempl.Mempool
|
||||
|
||||
@@ -199,9 +203,5 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
|
||||
}
|
||||
|
||||
func latestUncommittedHeight() int64 {
|
||||
nodeIsSyncing := env.ConsensusReactor.WaitSync()
|
||||
if nodeIsSyncing {
|
||||
return env.BlockStore.Height()
|
||||
}
|
||||
return env.BlockStore.Height() + 1
|
||||
}
|
||||
|
||||
@@ -51,6 +51,16 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
|
||||
votingPower = val.VotingPower
|
||||
}
|
||||
|
||||
phase := "initializing"
|
||||
switch {
|
||||
case env.StatesyncReactor.IsSyncing():
|
||||
phase = "statesync"
|
||||
case env.BlocksyncReactor.IsSyncing():
|
||||
phase = "blocksync"
|
||||
case env.ConsensusReactor.IsConsensusRunning():
|
||||
phase = "consensus"
|
||||
}
|
||||
|
||||
result := &ctypes.ResultStatus{
|
||||
NodeInfo: env.P2PTransport.NodeInfo().(p2p.DefaultNodeInfo),
|
||||
SyncInfo: ctypes.SyncInfo{
|
||||
@@ -62,7 +72,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
|
||||
EarliestAppHash: earliestAppHash,
|
||||
EarliestBlockHeight: earliestBlockHeight,
|
||||
EarliestBlockTime: time.Unix(0, earliestBlockTimeNano),
|
||||
CatchingUp: env.ConsensusReactor.WaitSync(),
|
||||
Phase: phase,
|
||||
},
|
||||
ValidatorInfo: ctypes.ValidatorInfo{
|
||||
Address: env.PubKey.Address(),
|
||||
|
||||
@@ -86,7 +86,9 @@ type SyncInfo struct {
|
||||
EarliestBlockHeight int64 `json:"earliest_block_height"`
|
||||
EarliestBlockTime time.Time `json:"earliest_block_time"`
|
||||
|
||||
CatchingUp bool `json:"catching_up"`
|
||||
// Phase inidicates which processes are advancing state:
|
||||
// Either statesync, blocksync or consensus
|
||||
Phase string `json:"phase"`
|
||||
}
|
||||
|
||||
// Info about the node's validator
|
||||
|
||||
@@ -81,6 +81,9 @@ type State struct {
|
||||
|
||||
// Copy makes a copy of the State for mutating.
|
||||
func (state State) Copy() State {
|
||||
if state.IsEmpty() {
|
||||
return State{}
|
||||
}
|
||||
|
||||
return State{
|
||||
Version: state.Version,
|
||||
|
||||
30
statesync/metrics.gen.go
Normal file
30
statesync/metrics.gen.go
Normal file
@@ -0,0 +1,30 @@
|
||||
// Code generated by metricsgen. DO NOT EDIT.
|
||||
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"github.com/go-kit/kit/metrics/discard"
|
||||
prometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
labels := []string{}
|
||||
for i := 0; i < len(labelsAndValues); i += 2 {
|
||||
labels = append(labels, labelsAndValues[i])
|
||||
}
|
||||
return &Metrics{
|
||||
Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "syncing",
|
||||
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
}
|
||||
}
|
||||
|
||||
func NopMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
Syncing: discard.NewGauge(),
|
||||
}
|
||||
}
|
||||
19
statesync/metrics.go
Normal file
19
statesync/metrics.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"github.com/go-kit/kit/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
|
||||
// package.
|
||||
MetricsSubsystem = "statesync"
|
||||
)
|
||||
|
||||
//go:generate go run ../scripts/metricsgen -struct=Metrics
|
||||
|
||||
// Metrics contains metrics exposed by this package.
|
||||
type Metrics struct {
|
||||
// Whether or not a node is state syncing. 1 if yes, 0 if no.
|
||||
Syncing metrics.Gauge
|
||||
}
|
||||
@@ -33,6 +33,7 @@ type Reactor struct {
|
||||
conn proxy.AppConnSnapshot
|
||||
connQuery proxy.AppConnQuery
|
||||
tempDir string
|
||||
metrics *Metrics
|
||||
|
||||
// This will only be set when a state sync is in progress. It is used to feed received
|
||||
// snapshots and chunks into the sync.
|
||||
@@ -46,12 +47,14 @@ func NewReactor(
|
||||
conn proxy.AppConnSnapshot,
|
||||
connQuery proxy.AppConnQuery,
|
||||
tempDir string,
|
||||
metrics *Metrics,
|
||||
) *Reactor {
|
||||
|
||||
r := &Reactor{
|
||||
cfg: cfg,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
metrics: metrics,
|
||||
}
|
||||
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)
|
||||
|
||||
@@ -92,6 +95,13 @@ func (r *Reactor) AddPeer(peer p2p.Peer) {
|
||||
}
|
||||
}
|
||||
|
||||
// IsSyncing returns true if state sync is actively being used to restore a state
|
||||
func (r *Reactor) IsSyncing() bool {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
return r.syncer != nil
|
||||
}
|
||||
|
||||
// RemovePeer implements p2p.Reactor.
|
||||
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
r.mtx.RLock()
|
||||
@@ -265,6 +275,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
|
||||
r.mtx.Unlock()
|
||||
return sm.State{}, nil, errors.New("a state sync is already in progress")
|
||||
}
|
||||
r.metrics.Syncing.Add(1)
|
||||
r.syncer = newSyncer(r.cfg, r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
|
||||
r.mtx.Unlock()
|
||||
|
||||
@@ -284,6 +295,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
|
||||
|
||||
r.mtx.Lock()
|
||||
r.syncer = nil
|
||||
r.metrics.Syncing.Add(0)
|
||||
r.mtx.Unlock()
|
||||
return state, commit, err
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
|
||||
|
||||
// Start a reactor and send a ssproto.ChunkRequest, then wait for and check response
|
||||
cfg := config.DefaultStateSyncConfig()
|
||||
r := NewReactor(*cfg, conn, nil, "")
|
||||
r := NewReactor(*cfg, conn, nil, "", NopMetrics())
|
||||
err := r.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
@@ -161,7 +161,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
|
||||
|
||||
// Start a reactor and send a SnapshotsRequestMessage, then wait for and check responses
|
||||
cfg := config.DefaultStateSyncConfig()
|
||||
r := NewReactor(*cfg, conn, nil, "")
|
||||
r := NewReactor(*cfg, conn, nil, "", NopMetrics())
|
||||
err := r.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
|
||||
@@ -101,7 +101,7 @@ func (voteSet *VoteSet) ChainID() string {
|
||||
|
||||
// Implements VoteSetReader.
|
||||
func (voteSet *VoteSet) GetHeight() int64 {
|
||||
if voteSet == nil {
|
||||
if voteSet.IsEmpty() {
|
||||
return 0
|
||||
}
|
||||
return voteSet.height
|
||||
@@ -109,7 +109,7 @@ func (voteSet *VoteSet) GetHeight() int64 {
|
||||
|
||||
// Implements VoteSetReader.
|
||||
func (voteSet *VoteSet) GetRound() int32 {
|
||||
if voteSet == nil {
|
||||
if voteSet.IsEmpty() {
|
||||
return -1
|
||||
}
|
||||
return voteSet.round
|
||||
@@ -117,7 +117,7 @@ func (voteSet *VoteSet) GetRound() int32 {
|
||||
|
||||
// Implements VoteSetReader.
|
||||
func (voteSet *VoteSet) Type() byte {
|
||||
if voteSet == nil {
|
||||
if voteSet.IsEmpty() {
|
||||
return 0x00
|
||||
}
|
||||
return byte(voteSet.signedMsgType)
|
||||
@@ -125,12 +125,16 @@ func (voteSet *VoteSet) Type() byte {
|
||||
|
||||
// Implements VoteSetReader.
|
||||
func (voteSet *VoteSet) Size() int {
|
||||
if voteSet == nil {
|
||||
if voteSet.IsEmpty() {
|
||||
return 0
|
||||
}
|
||||
return voteSet.valSet.Size()
|
||||
}
|
||||
|
||||
func (voteSet *VoteSet) IsEmpty() bool {
|
||||
return voteSet == nil || voteSet.height == 0
|
||||
}
|
||||
|
||||
// Returns added=true if vote is valid and new.
|
||||
// Otherwise returns err=ErrVote[
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user