Compare commits

..

10 Commits

Author SHA1 Message Date
Callum Waters
7018c73baf fix up a few things 2022-11-09 15:17:43 +01:00
Callum Waters
4021df503b lint 2022-11-09 13:51:33 +01:00
Callum Waters
11976a4863 fix lints 2022-11-01 14:18:13 +01:00
Callum Waters
296ec7e113 merge 2022-11-01 11:58:53 +01:00
Callum Waters
933256c862 node start up phases 2022-11-01 11:26:46 +01:00
Callum Waters
a2610a9998 lint 2022-10-26 17:07:23 +02:00
Callum Waters
da8810f480 fix gosec compaints 2022-10-26 17:01:11 +02:00
Callum Waters
10dca45e8c Merge branch 'main' of github.com:tendermint/tendermint 2022-10-26 16:45:32 +02:00
Callum Waters
67bc51ad72 split out node into a setup file 2022-10-26 16:45:27 +02:00
Callum Waters
3faf580a0d remove unused id file 2022-10-26 16:44:45 +02:00
85 changed files with 854 additions and 3227 deletions

View File

@@ -45,7 +45,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Notify Slack upon pre-release
uses: slackapi/slack-github-action@v1.23.0
uses: slackapi/slack-github-action@v1.22.0
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

View File

@@ -42,7 +42,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Notify Slack upon release
uses: slackapi/slack-github-action@v1.23.0
uses: slackapi/slack-github-action@v1.22.0
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

View File

@@ -12,7 +12,6 @@
- Go API
- [p2p] \#9625 Remove unused p2p/trust package (@cmwaters)
- [light] \#9420 Default trust level has been modified to a more secure value of 2/3 (@cmwaters)
- Blockchain Protocol
@@ -29,7 +28,6 @@
- [pubsub] \#7319 Performance improvements for the event query API (@creachadair)
- [p2p/pex] \#6509 Improve addrBook.hash performance (@cuonglm)
- [crypto/merkle] \#6443 & \#6513 Improve HashAlternatives performance (@cuonglm, @marbar3778)
- [rpc] \#9650 Enable caching of RPC responses (@JayT106)
### BUG FIXES
@@ -41,29 +39,6 @@ Special thanks to external contributors on this release:
Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermint).
## v0.38.0
### BREAKING CHANGES
- CLI/RPC/Config
- Apps
- P2P Protocol
- Go API
- [light] \#9420 Default trust level has been modified to a more secure value of 2/3 (@cmwaters)
- Blockchain Protocol
### FEATURES
### IMPROVEMENTS
### BUG FIXES
## v0.37.0
### BREAKING CHANGES
- CLI/RPC/Config
@@ -122,4 +97,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [consensus] \#9229 fix round number of `enterPropose` when handling `RoundStepNewRound` timeout. (@fatcat22)
- [docker] \#9073 enable cross platform build using docker buildx
- [blocksync] \#9518 handle the case when the sending queue is full: retry block request after a timeout
- [blocksync] \#9518 handle the case when the sending queue is full: retry block request after a timeout

View File

@@ -113,15 +113,10 @@ For more information on upgrading, see [UPGRADING.md](./UPGRADING.md).
### Supported Versions
Because we are a small core team, we have limited capacity to ship patch
updates, including security updates. Consequently, we strongly recommend keeping
Tendermint up-to-date. Upgrading instructions can be found in
[UPGRADING.md](./UPGRADING.md).
Currently supported versions include:
- v0.34.x
- v0.37.x (release candidate)
Because we are a small core team, we only ship patch updates, including security
updates, to the most recent minor release and the second-most recent minor
release. Consequently, we strongly recommend keeping Tendermint up-to-date.
Upgrading instructions can be found in [UPGRADING.md](./UPGRADING.md).
## Resources

View File

@@ -98,7 +98,7 @@ Sometimes it's necessary to rename libraries to avoid naming collisions or ambig
* Make use of table driven testing where possible and not-cumbersome
* [Inspiration](https://dave.cheney.net/2013/06/09/writing-table-driven-tests-in-go)
* Make use of [assert](https://godoc.org/github.com/stretchr/testify/assert) and [require](https://godoc.org/github.com/stretchr/testify/require)
* When using mocks, it is recommended to use Testify [mock](<https://pkg.go.dev/github.com/stretchr/testify/mock>
* When using mocks, it is recommended to use Testify [mock] (<https://pkg.go.dev/github.com/stretchr/testify/mock>
) along with [Mockery](https://github.com/vektra/mockery) for autogeneration
## Errors

View File

@@ -6,6 +6,8 @@ import (
tmsync "github.com/tendermint/tendermint/libs/sync"
)
var _ Client = (*localClient)(nil)
// NOTE: use defer to unlock mutex because Application might panic (e.g., in
// case of malicious tx or query). It only makes sense for publicly exposed
// methods like CheckTx (/broadcast_tx_* RPC endpoint) or Query (/abci_query
@@ -22,6 +24,8 @@ var _ Client = (*localClient)(nil)
// NewLocalClient creates a local client, which will be directly calling the
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = new(tmsync.Mutex)
@@ -305,8 +309,7 @@ func (app *localClient) OfferSnapshotSync(req types.RequestOfferSnapshot) (*type
}
func (app *localClient) LoadSnapshotChunkSync(
req types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@@ -315,8 +318,7 @@ func (app *localClient) LoadSnapshotChunkSync(
}
func (app *localClient) ApplySnapshotChunkSync(
req types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

View File

@@ -1,263 +0,0 @@
package abcicli
import (
"sync"
types "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/service"
)
type unsyncLocalClient struct {
service.BaseService
types.Application
// This mutex is exclusively used to protect the callback.
mtx sync.RWMutex
Callback
}
var _ Client = (*unsyncLocalClient)(nil)
// NewUnsyncLocalClient creates an unsynchronized local client, which will be
// directly calling the methods of the given app.
//
// Unlike NewLocalClient, it does not hold a mutex around the application, so
// it is up to the application to manage its synchronization properly.
func NewUnsyncLocalClient(app types.Application) Client {
cli := &unsyncLocalClient{
Application: app,
}
cli.BaseService = *service.NewBaseService(nil, "unsyncLocalClient", cli)
return cli
}
func (app *unsyncLocalClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
}
// TODO: change types.Application to include Error()?
func (app *unsyncLocalClient) Error() error {
return nil
}
func (app *unsyncLocalClient) FlushAsync() *ReqRes {
// Do nothing
return newLocalReqRes(types.ToRequestFlush(), nil)
}
func (app *unsyncLocalClient) EchoAsync(msg string) *ReqRes {
return app.callback(
types.ToRequestEcho(msg),
types.ToResponseEcho(msg),
)
}
func (app *unsyncLocalClient) InfoAsync(req types.RequestInfo) *ReqRes {
res := app.Application.Info(req)
return app.callback(
types.ToRequestInfo(req),
types.ToResponseInfo(res),
)
}
func (app *unsyncLocalClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
res := app.Application.DeliverTx(params)
return app.callback(
types.ToRequestDeliverTx(params),
types.ToResponseDeliverTx(res),
)
}
func (app *unsyncLocalClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
)
}
func (app *unsyncLocalClient) QueryAsync(req types.RequestQuery) *ReqRes {
res := app.Application.Query(req)
return app.callback(
types.ToRequestQuery(req),
types.ToResponseQuery(res),
)
}
func (app *unsyncLocalClient) CommitAsync() *ReqRes {
res := app.Application.Commit()
return app.callback(
types.ToRequestCommit(),
types.ToResponseCommit(res),
)
}
func (app *unsyncLocalClient) InitChainAsync(req types.RequestInitChain) *ReqRes {
res := app.Application.InitChain(req)
return app.callback(
types.ToRequestInitChain(req),
types.ToResponseInitChain(res),
)
}
func (app *unsyncLocalClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes {
res := app.Application.BeginBlock(req)
return app.callback(
types.ToRequestBeginBlock(req),
types.ToResponseBeginBlock(res),
)
}
func (app *unsyncLocalClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
res := app.Application.EndBlock(req)
return app.callback(
types.ToRequestEndBlock(req),
types.ToResponseEndBlock(res),
)
}
func (app *unsyncLocalClient) ListSnapshotsAsync(req types.RequestListSnapshots) *ReqRes {
res := app.Application.ListSnapshots(req)
return app.callback(
types.ToRequestListSnapshots(req),
types.ToResponseListSnapshots(res),
)
}
func (app *unsyncLocalClient) OfferSnapshotAsync(req types.RequestOfferSnapshot) *ReqRes {
res := app.Application.OfferSnapshot(req)
return app.callback(
types.ToRequestOfferSnapshot(req),
types.ToResponseOfferSnapshot(res),
)
}
func (app *unsyncLocalClient) LoadSnapshotChunkAsync(req types.RequestLoadSnapshotChunk) *ReqRes {
res := app.Application.LoadSnapshotChunk(req)
return app.callback(
types.ToRequestLoadSnapshotChunk(req),
types.ToResponseLoadSnapshotChunk(res),
)
}
func (app *unsyncLocalClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotChunk) *ReqRes {
res := app.Application.ApplySnapshotChunk(req)
return app.callback(
types.ToRequestApplySnapshotChunk(req),
types.ToResponseApplySnapshotChunk(res),
)
}
func (app *unsyncLocalClient) PrepareProposalAsync(req types.RequestPrepareProposal) *ReqRes {
res := app.Application.PrepareProposal(req)
return app.callback(
types.ToRequestPrepareProposal(req),
types.ToResponsePrepareProposal(res),
)
}
func (app *unsyncLocalClient) ProcessProposalAsync(req types.RequestProcessProposal) *ReqRes {
res := app.Application.ProcessProposal(req)
return app.callback(
types.ToRequestProcessProposal(req),
types.ToResponseProcessProposal(res),
)
}
//-------------------------------------------------------
func (app *unsyncLocalClient) FlushSync() error {
return nil
}
func (app *unsyncLocalClient) EchoSync(msg string) (*types.ResponseEcho, error) {
return &types.ResponseEcho{Message: msg}, nil
}
func (app *unsyncLocalClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
res := app.Application.Info(req)
return &res, nil
}
func (app *unsyncLocalClient) DeliverTxSync(req types.RequestDeliverTx) (*types.ResponseDeliverTx, error) {
res := app.Application.DeliverTx(req)
return &res, nil
}
func (app *unsyncLocalClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
res := app.Application.CheckTx(req)
return &res, nil
}
func (app *unsyncLocalClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
res := app.Application.Query(req)
return &res, nil
}
func (app *unsyncLocalClient) CommitSync() (*types.ResponseCommit, error) {
res := app.Application.Commit()
return &res, nil
}
func (app *unsyncLocalClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) {
res := app.Application.InitChain(req)
return &res, nil
}
func (app *unsyncLocalClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
res := app.Application.BeginBlock(req)
return &res, nil
}
func (app *unsyncLocalClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) {
res := app.Application.EndBlock(req)
return &res, nil
}
func (app *unsyncLocalClient) ListSnapshotsSync(req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
res := app.Application.ListSnapshots(req)
return &res, nil
}
func (app *unsyncLocalClient) OfferSnapshotSync(req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
res := app.Application.OfferSnapshot(req)
return &res, nil
}
func (app *unsyncLocalClient) LoadSnapshotChunkSync(
req types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
res := app.Application.LoadSnapshotChunk(req)
return &res, nil
}
func (app *unsyncLocalClient) ApplySnapshotChunkSync(
req types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
res := app.Application.ApplySnapshotChunk(req)
return &res, nil
}
func (app *unsyncLocalClient) PrepareProposalSync(req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
res := app.Application.PrepareProposal(req)
return &res, nil
}
func (app *unsyncLocalClient) ProcessProposalSync(req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
res := app.Application.ProcessProposal(req)
return &res, nil
}
//-------------------------------------------------------
func (app *unsyncLocalClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.Callback(req, res)
rr := newLocalReqRes(req, res)
rr.callbackInvoked = true
return rr
}

30
blocksync/metrics.gen.go Normal file
View File

@@ -0,0 +1,30 @@
// Code generated by metricsgen. DO NOT EDIT.
package blocksync
import (
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "syncing",
Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
}
}
func NopMetrics() *Metrics {
return &Metrics{
Syncing: discard.NewGauge(),
}
}

19
blocksync/metrics.go Normal file
View File

@@ -0,0 +1,19 @@
package blocksync
import (
"github.com/go-kit/kit/metrics"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "blocksync"
)
//go:generate go run ../scripts/metricsgen -struct=Metrics
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Whether or not a node is block syncing. 1 if yes, 0 if no.
Syncing metrics.Gauge
}

View File

@@ -99,6 +99,9 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
// OnStart implements service.Service by spawning requesters routine and recording
// pool's start time.
func (pool *BlockPool) OnStart() error {
if pool.height == 0 {
return errors.New("height not set")
}
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
@@ -111,22 +114,19 @@ func (pool *BlockPool) makeRequestersRoutine() {
break
}
_, numPending, lenRequesters := pool.GetStatus()
switch {
case numPending >= maxPendingRequests:
height, maxPeerHeight, numPending, lenRequesters := pool.GetStatus()
if height >= maxPeerHeight ||
numPending >= maxPendingRequests ||
lenRequesters >= maxTotalRequesters {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
case lenRequesters >= maxTotalRequesters:
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
default:
// request for more blocks.
pool.makeNextRequester()
continue
}
// request for more blocks.
pool.makeNextRequester()
}
}
@@ -156,11 +156,11 @@ func (pool *BlockPool) removeTimedoutPeers() {
// GetStatus returns pool's height, numPending requests and the number of
// requesters.
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
func (pool *BlockPool) GetStatus() (height, maxPeerHeight int64, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
return pool.height, pool.maxPeerHeight, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}
// IsCaughtUp returns true if this node is caught up, false - otherwise.
@@ -302,6 +302,7 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
}
if height > pool.maxPeerHeight {
pool.Logger.Info("new max peer height", "height", height)
pool.maxPeerHeight = height
}
}
@@ -388,7 +389,7 @@ func (pool *BlockPool) makeNextRequester() {
err := request.Start()
if err != nil {
request.Logger.Error("Error starting request", "err", err)
pool.Logger.Error("Error starting request", "err", err)
}
}

View File

@@ -32,7 +32,7 @@ const (
type consensusReactor interface {
// for when we switch from blocksync reactor and block sync to
// the consensus machine
SwitchToConsensus(state sm.State, skipWAL bool)
SwitchToConsensus(state sm.State, skipWAL bool) error
}
type peerError struct {
@@ -54,16 +54,15 @@ type Reactor struct {
blockExec *sm.BlockExecutor
store *store.BlockStore
pool *BlockPool
blockSync bool
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
metrics *Metrics
}
// NewReactor returns new reactor instance.
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
blockSync bool) *Reactor {
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, metrics *Metrics) *Reactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
@@ -85,9 +84,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
blockExec: blockExec,
store: store,
pool: pool,
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
metrics: metrics,
}
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
return bcR
@@ -101,22 +100,22 @@ func (bcR *Reactor) SetLogger(l log.Logger) {
// OnStart implements service.Service.
func (bcR *Reactor) OnStart() error {
if bcR.blockSync {
err := bcR.pool.Start()
if err != nil {
return err
}
go bcR.poolRoutine(false)
}
return nil
}
// IsSyncing returns whether the node is using blocksync to advance heights
func (bcR *Reactor) IsSyncing() bool {
return bcR.pool.IsRunning()
}
// SwitchToBlockSync is called by the state sync reactor when switching to block sync.
func (bcR *Reactor) SwitchToBlockSync(state sm.State) error {
bcR.blockSync = true
bcR.initialState = state
bcR.pool.height = state.LastBlockHeight + 1
if state.LastBlockHeight == 0 {
bcR.pool.height = state.InitialHeight
} else {
bcR.pool.height = state.LastBlockHeight + 1
}
err := bcR.pool.Start()
if err != nil {
return err
@@ -127,7 +126,7 @@ func (bcR *Reactor) SwitchToBlockSync(state sm.State) error {
// OnStop implements service.Service.
func (bcR *Reactor) OnStop() {
if bcR.blockSync {
if bcR.pool.IsRunning() {
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
@@ -236,6 +235,8 @@ func (bcR *Reactor) Receive(e p2p.Envelope) {
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *Reactor) poolRoutine(stateSynced bool) {
bcR.metrics.Syncing.Set(1)
defer bcR.metrics.Syncing.Set(0)
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
defer trySyncTicker.Stop()
@@ -293,24 +294,28 @@ FOR_LOOP:
for {
select {
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
height, peerHeight, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
"outbound", outbound, "inbound", inbound, "peerHeight", peerHeight)
if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
// TODO: node struct should be responsible for switching from block sync to
// consensus. It's messy to have to grab the consensus reactor from the switch
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
err := conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
bcR.Logger.Error("failed to switch to consensus", "err", err)
}
// else {
// should only happen during testing
// }
break FOR_LOOP
return
}
case <-trySyncTicker.C: // chan time

View File

@@ -95,14 +95,6 @@ func newReactor(
mock.Anything,
mock.Anything).Return(nil)
// Make the Reactor itself.
// NOTE we have to create and commit the blocks first because
// pool.height is determined from the store.
fastSync := true
db := dbm.NewMemDB()
stateStore = sm.NewStore(db, sm.StoreOptions{
DiscardABCIResponses: false,
})
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
mp, sm.EmptyEvidencePool{}, blockStore)
if err = stateStore.Save(state); err != nil {
@@ -145,7 +137,7 @@ func newReactor(
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, NopMetrics())
bcReactor.SetLogger(logger.With("module", "blocksync"))
return ReactorPair{bcReactor, proxyApp}
@@ -156,6 +148,9 @@ func TestNoBlockResponse(t *testing.T) {
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
maxBlockHeight := int64(65)
reactorPairs := make([]ReactorPair, 2)
@@ -169,6 +164,12 @@ func TestNoBlockResponse(t *testing.T) {
}, p2p.Connect2Switches)
for _, reactor := range reactorPairs {
// turn on the syncing algorithm
err := reactor.reactor.SwitchToBlockSync(state)
require.NoError(t, err)
}
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
@@ -218,6 +219,9 @@ func TestBadBlockStopsPeer(t *testing.T) {
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
maxBlockHeight := int64(148)
// Other chain needs a different validator set
@@ -244,6 +248,12 @@ func TestBadBlockStopsPeer(t *testing.T) {
}, p2p.Connect2Switches)
for _, reactor := range reactorPairs {
// turn on the syncing algorithm
err := reactor.reactor.SwitchToBlockSync(state)
require.NoError(t, err)
}
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
@@ -287,6 +297,11 @@ func TestBadBlockStopsPeer(t *testing.T) {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
otherState, err := sm.MakeGenesisState(otherGenDoc)
require.NoError(t, err)
err = lastReactorPair.reactor.SwitchToBlockSync(otherState)
require.NoError(t, err)
for {
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break

View File

@@ -67,7 +67,7 @@ func copyConfig(home, dir string) error {
func dumpProfile(dir, addr, profile string, debug int) error {
endpoint := fmt.Sprintf("%s/debug/pprof/%s?debug=%d", addr, profile, debug)
//nolint:gosec,nolintlint
//nolint:all
resp, err := http.Get(endpoint)
if err != nil {
return fmt.Errorf("failed to query for %s profile: %w", profile, err)

View File

@@ -708,28 +708,11 @@ type MempoolConfig struct {
// Mempool version to use:
// 1) "v0" - (default) FIFO mempool.
// 2) "v1" - prioritized mempool.
Version string `mapstructure:"version"`
// RootDir is the root directory for all data. This should be configured via
// the $TMHOME env variable or --home cmd flag rather than overriding this
// struct field.
RootDir string `mapstructure:"home"`
// Recheck (default: true) defines whether Tendermint should recheck the
// validity for all remaining transaction in the mempool after a block.
// Since a block affects the application state, some transactions in the
// mempool may become invalid. If this does not apply to your application,
// you can disable rechecking.
Recheck bool `mapstructure:"recheck"`
// Broadcast (default: true) defines whether the mempool should relay
// transactions to other peers. Setting this to false will stop the mempool
// from relaying transactions to other peers until they are included in a
// block. In other words, if Broadcast is disabled, only the peer you send
// the tx to will see it until it is included in a block.
Broadcast bool `mapstructure:"broadcast"`
// WalPath (default: "") configures the location of the Write Ahead Log
// (WAL) for the mempool. The WAL is disabled by default. To enable, set
// WalPath to where you want the WAL to be written (e.g.
// "data/mempool.wal").
WalPath string `mapstructure:"wal_dir"`
Version string `mapstructure:"version"`
RootDir string `mapstructure:"home"`
Recheck bool `mapstructure:"recheck"`
Broadcast bool `mapstructure:"broadcast"`
WalPath string `mapstructure:"wal_dir"`
// Maximum number of transactions in the mempool
Size int `mapstructure:"size"`
// Limit the total size of all txs in the mempool.

View File

@@ -349,24 +349,8 @@ dial_timeout = "{{ .P2P.DialTimeout }}"
# 2) "v1" - prioritized mempool.
version = "{{ .Mempool.Version }}"
# Recheck (default: true) defines whether Tendermint should recheck the
# validity for all remaining transaction in the mempool after a block.
# Since a block affects the application state, some transactions in the
# mempool may become invalid. If this does not apply to your application,
# you can disable rechecking.
recheck = {{ .Mempool.Recheck }}
# Broadcast (default: true) defines whether the mempool should relay
# transactions to other peers. Setting this to false will stop the mempool
# from relaying transactions to other peers until they are included in a
# block. In other words, if Broadcast is disabled, only the peer you send
# the tx to will see it until it is included in a block.
broadcast = {{ .Mempool.Broadcast }}
# WalPath (default: "") configures the location of the Write Ahead Log
# (WAL) for the mempool. The WAL is disabled by default. To enable, set
# WalPath to where you want the WAL to be written (e.g.
# "data/mempool.wal").
wal_dir = "{{ js .Mempool.WalPath }}"
# Maximum number of transactions in the mempool
@@ -452,7 +436,7 @@ chunk_fetchers = "{{ .StateSync.ChunkFetchers }}"
[blocksync]
# Block Sync version to use:
#
#
# In v0.37, v1 and v2 of the block sync protocols were deprecated.
# Please use v0 instead.
#

View File

@@ -46,6 +46,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
appFunc := newKVStore
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
css := make([]*State, nValidators)
for i := 0; i < nValidators; i++ {
@@ -54,7 +56,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: false,
})
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
defer os.RemoveAll(thisConfig.RootDir)
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
@@ -102,7 +103,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy()))
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
@@ -125,7 +126,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blocksSubs := make([]types.Subscription, 0)
eventBuses := make([]*types.EventBus, nValidators)
for i := 0; i < nValidators; i++ {
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
// Note, we dont start the consensus states
reactors[i] = NewReactor(css[i])
reactors[i].SetLogger(css[i].Logger)
// eventBus is already started with the cs
@@ -254,8 +256,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// start the consensus reactors
for i := 0; i < nValidators; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s, false)
require.NoError(t, reactors[i].SwitchToConsensus(state.Copy(), false))
}
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -314,7 +315,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
N := 4
logger := consensusLogger().With("test", "byzantine")
app := newKVStore
css, cleanup := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), app)
css, cleanup := randConsensusNet(t, N, "consensus_byzantine_test", newMockTickerFunc(false), app)
defer cleanup()
// give the byzantine validator a normal ticker
@@ -363,7 +364,8 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
conR := NewReactor(css[i], true) // so we don't start the consensus states
// Note, we don't start the consensus states
conR := NewReactor(css[i])
conR.SetLogger(logger.With("validator", i))
conR.SetEventBus(eventBus)
@@ -407,13 +409,13 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// note these must be started before the byz
for i := 1; i < N; i++ {
cr := reactors[i].(*Reactor)
cr.SwitchToConsensus(cr.conS.GetState(), false)
require.NoError(t, cr.SwitchToConsensus(cr.conS.GetState(), false))
}
// start the byzantine state machine
byzR := reactors[0].(*ByzantineReactor)
s := byzR.reactor.conS.GetState()
byzR.reactor.SwitchToConsensus(s, false)
require.NoError(t, byzR.reactor.SwitchToConsensus(s, false))
// byz proposer sends one block to peers[0]
// and the other block to peers[1] and peers[2].
@@ -592,7 +594,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
// Send our state to peer.
// If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.waitSync {
if br.reactor.conS.IsRunning() {
br.reactor.sendNewRoundStepMessage(peer)
}
}

View File

@@ -440,7 +440,7 @@ func newStateWithConfigAndBlockStore(
}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy()))
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)
@@ -747,18 +747,17 @@ func consensusLogger() log.Logger {
}).With("module", "consensus")
}
func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker,
func randConsensusNet(t *testing.T, nValidators int, testName string, tickerFunc func() TimeoutTicker,
appFunc func() abci.Application, configOpts ...func(*cfg.Config)) ([]*State, cleanupFunc) {
t.Helper()
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
css := make([]*State, nValidators)
logger := consensusLogger()
configRootDirs := make([]string, 0, nValidators)
for i := 0; i < nValidators; i++ {
stateDB := dbm.NewMemDB() // each state needs its own db
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: false,
})
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
configRootDirs = append(configRootDirs, thisConfig.RootDir)
for _, opt := range configOpts {
@@ -772,6 +771,7 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB)
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
css[i].updateToState(state.Copy())
}
return css, func() {
for _, dir := range configRootDirs {

View File

@@ -19,7 +19,7 @@ import (
// Ensure a testnet makes blocks
func TestReactorInvalidPrecommit(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
for i := 0; i < 4; i++ {

View File

@@ -118,18 +118,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "latest_block_height",
Help: "The latest block height.",
}, labels).With(labelsAndValues...),
BlockSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_syncing",
Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "state_syncing",
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
@@ -208,8 +196,6 @@ func NopMetrics() *Metrics {
BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
CommittedHeight: discard.NewGauge(),
BlockSyncing: discard.NewGauge(),
StateSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),
StepDurationSeconds: discard.NewHistogram(),
BlockGossipPartsReceived: discard.NewCounter(),

View File

@@ -61,10 +61,6 @@ type Metrics struct {
TotalTxs metrics.Gauge
// The latest block height.
CommittedHeight metrics.Gauge `metrics_name:"latest_block_height"`
// Whether or not a node is block syncing. 1 if yes, 0 if no.
BlockSyncing metrics.Gauge
// Whether or not a node is state syncing. 1 if yes, 0 if no.
StateSyncing metrics.Gauge
// Number of block parts transmitted by each peer.
BlockParts metrics.Counter `metrics_labels:"peer_id"`

View File

@@ -42,7 +42,6 @@ type Reactor struct {
conS *State
mtx tmsync.RWMutex
waitSync bool
eventBus *types.EventBus
rs *cstypes.RoundState
@@ -53,12 +52,11 @@ type ReactorOption func(*Reactor)
// NewReactor returns a new Reactor with the given
// consensusState.
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
func NewReactor(consensusState *State, options ...ReactorOption) *Reactor {
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
conS: consensusState,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
@@ -72,21 +70,12 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
// OnStart implements BaseService by subscribing to events, which later will be
// broadcasted to other peers and starting state if we're not in block sync.
func (conR *Reactor) OnStart() error {
conR.Logger.Info("Reactor ", "waitSync", conR.WaitSync())
// start routine that computes peer statistics for evaluating peer quality
go conR.peerStatsRoutine()
conR.subscribeToBroadcastEvents()
go conR.updateRoundStateRoutine()
if !conR.WaitSync() {
err := conR.conS.Start()
if err != nil {
return err
}
}
return nil
}
@@ -94,47 +83,34 @@ func (conR *Reactor) OnStart() error {
// state.
func (conR *Reactor) OnStop() {
conR.unsubscribeFromBroadcastEvents()
if err := conR.conS.Stop(); err != nil {
conR.Logger.Error("Error stopping consensus state", "err", err)
}
if !conR.WaitSync() {
if conR.conS.IsRunning() {
if err := conR.conS.Stop(); err != nil {
conR.Logger.Error("Error stopping consensus state", "err", err)
}
conR.conS.Wait()
}
}
func (conR *Reactor) IsConsensusRunning() bool {
return conR.conS.IsRunning()
}
// SwitchToConsensus switches from block_sync mode to consensus mode.
// It resets the state, turns off block_sync, and starts the consensus state-machine
func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) {
func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) error {
conR.Logger.Info("SwitchToConsensus")
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
if state.LastBlockHeight > state.InitialHeight {
conR.conS.reconstructLastCommit(state)
}
// NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a
// NewRoundStepMessage.
conR.conS.updateToState(state)
conR.mtx.Lock()
conR.waitSync = false
conR.mtx.Unlock()
conR.Metrics.BlockSyncing.Set(0)
conR.Metrics.StateSyncing.Set(0)
if skipWAL {
conR.conS.doWALCatchup = false
}
err := conR.conS.Start()
if err != nil {
panic(fmt.Sprintf(`Failed to start consensus state: %v
conS:
%+v
conR:
%+v`, err, conR.conS, conR))
}
return conR.conS.Start()
}
// GetChannels implements Reactor
@@ -201,7 +177,7 @@ func (conR *Reactor) AddPeer(peer p2p.Peer) {
// Send our state to peer.
// If we're block_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !conR.WaitSync() {
if conR.conS.IsRunning() {
conR.sendNewRoundStepMessage(peer)
}
}
@@ -311,7 +287,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
case DataChannel:
if conR.WaitSync() {
if !conR.conS.IsRunning() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
@@ -330,7 +306,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
case VoteChannel:
if conR.WaitSync() {
if !conR.conS.IsRunning() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
@@ -352,7 +328,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
case VoteSetBitsChannel:
if conR.WaitSync() {
if !conR.conS.IsRunning() {
conR.Logger.Info("Ignoring message received during sync", "msg", msg)
return
}
@@ -393,13 +369,6 @@ func (conR *Reactor) SetEventBus(b *types.EventBus) {
conR.conS.SetEventBus(b)
}
// WaitSync returns whether the consensus reactor is waiting for state/block sync.
func (conR *Reactor) WaitSync() bool {
conR.mtx.RLock()
defer conR.mtx.RUnlock()
return conR.waitSync
}
//--------------------------------------
// subscribeToBroadcastEvents subscribes for new round steps and votes
@@ -543,6 +512,11 @@ OUTER_LOOP:
if !peer.IsRunning() || !conR.IsRunning() {
return
}
if !conR.IsConsensusRunning() {
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
continue OUTER_LOOP
}
rs := conR.getRoundState()
prs := ps.GetRoundState()
@@ -687,7 +661,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
}
return
}
// logger.Info("No parts to send in catch-up, sleeping")
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
}
@@ -697,12 +671,16 @@ func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
// Simple hack to throttle logs upon sleep.
var sleeping = 0
OUTER_LOOP:
for {
// Manage disconnects from self or peer.
if !peer.IsRunning() || !conR.IsRunning() {
return
}
if !conR.IsConsensusRunning() {
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
continue
}
rs := conR.getRoundState()
prs := ps.GetRoundState()
@@ -713,14 +691,11 @@ OUTER_LOOP:
sleeping = 0
}
// logger.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
// "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
heightLogger := logger.With("height", prs.Height)
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
continue OUTER_LOOP
continue
}
}
@@ -729,7 +704,7 @@ OUTER_LOOP:
if prs.Height != 0 && rs.Height == prs.Height+1 {
if ps.PickSendVote(rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
continue
}
}
@@ -742,7 +717,7 @@ OUTER_LOOP:
if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil {
if ps.PickSendVote(commit) {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
continue OUTER_LOOP
continue
}
}
}
@@ -759,7 +734,7 @@ OUTER_LOOP:
}
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
continue OUTER_LOOP
continue
}
}
@@ -833,6 +808,11 @@ OUTER_LOOP:
return
}
if !conR.IsConsensusRunning() {
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
continue OUTER_LOOP
}
// Maybe send Height/Round/Prevotes
{
rs := conR.getRoundState()
@@ -918,8 +898,6 @@ OUTER_LOOP:
}
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
continue OUTER_LOOP
}
}

View File

@@ -55,9 +55,8 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
blocksSubs := make([]types.Subscription, 0)
eventBuses := make([]*types.EventBus, n)
for i := 0; i < n; i++ {
/*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info")
if err != nil { t.Fatal(err)}*/
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
// Note, we dont start the consensus states
reactors[i] = NewReactor(css[i])
reactors[i].SetLogger(css[i].Logger)
// eventBus is already started with the cs
@@ -88,7 +87,8 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
// TODO: is this still true with new pubsub?
for i := 0; i < n; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s, false)
err := reactors[i].SwitchToConsensus(s, false)
require.NoError(t, err)
}
return reactors, blocksSubs, eventBuses
}
@@ -113,7 +113,7 @@ func stopConsensusNet(logger log.Logger, reactors []*Reactor, eventBuses []*type
// Ensure a testnet makes blocks
func TestReactorBasic(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -135,6 +135,8 @@ func TestReactorWithEvidence(t *testing.T) {
// css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
css := make([]*State, nValidators)
logger := consensusLogger()
for i := 0; i < nValidators; i++ {
@@ -142,7 +144,6 @@ func TestReactorWithEvidence(t *testing.T) {
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: false,
})
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
defer os.RemoveAll(thisConfig.RootDir)
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
@@ -203,7 +204,7 @@ func TestReactorWithEvidence(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs := NewState(thisConfig.Consensus, blockExec, blockStore, mempool, evpool2, WithState(state.Copy()))
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)
@@ -237,7 +238,7 @@ func TestReactorWithEvidence(t *testing.T) {
// Ensure a testnet makes blocks when there are txs
func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore,
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore,
func(c *cfg.Config) {
c.Consensus.CreateEmptyBlocks = false
})
@@ -258,7 +259,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
N := 1
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
reactors, _, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -284,7 +285,7 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
N := 1
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
reactors, _, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -310,7 +311,7 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
// Test we record stats about votes and block parts from other peers.
func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_test", newMockTickerFunc(true), newKVStore)
defer cleanup()
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
@@ -336,6 +337,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
nVals := 4
logger := log.TestingLogger()
css, cleanup := randConsensusNet(
t,
nVals,
"consensus_voting_power_changes_test",
newMockTickerFunc(true),
@@ -531,7 +533,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
// Check we can make blocks with skip_timeout_commit=false
func TestReactorWithTimeoutCommit(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newKVStore)
css, cleanup := randConsensusNet(t, N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newKVStore)
defer cleanup()
// override default SkipTimeoutCommit == true for tests
for i := 0; i < N; i++ {

View File

@@ -129,8 +129,8 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
}
pb.cs.Wait()
newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
newCS := NewState(pb.cs.config, pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, WithState(pb.genesisState.Copy()))
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()
@@ -332,8 +332,8 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
consensusState := NewState(csConfig, blockExec,
blockStore, mempool, evpool, WithState(state.Copy()))
consensusState.SetEventBus(eventBus)
return consensusState

View File

@@ -67,7 +67,8 @@ func TestMain(m *testing.M) {
func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config,
lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) {
logger := log.TestingLogger()
state, _ := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile())
state, err := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile())
require.NoError(t, err)
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
consensusReplayConfig,
@@ -81,7 +82,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
bytes, _ := os.ReadFile(cs.config.WalFile())
t.Logf("====== WAL: \n\r%X\n", bytes)
err := cs.Start()
err = cs.Start()
require.NoError(t, err)
defer func() {
if err := cs.Stop(); err != nil {
@@ -555,40 +556,40 @@ func TestSimulateValidatorsChange(t *testing.T) {
// Sync from scratch
func TestHandshakeReplayAll(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, 0, m, false)
testHandshakeReplay(t, 0, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, 0, m, true)
testHandshakeReplay(t, 0, m, true)
}
}
// Sync many, not from scratch
func TestHandshakeReplaySome(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, 2, m, false)
testHandshakeReplay(t, 2, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, 2, m, true)
testHandshakeReplay(t, 2, m, true)
}
}
// Sync from lagging by one
func TestHandshakeReplayOne(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks-1, m, false)
testHandshakeReplay(t, numBlocks-1, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks-1, m, true)
testHandshakeReplay(t, numBlocks-1, m, true)
}
}
// Sync from caught up
func TestHandshakeReplayNone(t *testing.T) {
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks, m, false)
testHandshakeReplay(t, numBlocks, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks, m, true)
testHandshakeReplay(t, numBlocks, m, true)
}
}
@@ -660,25 +661,27 @@ func tempWALWithData(data []byte) string {
// Make some blocks. Start a fresh app and apply nBlocks blocks.
// Then restart the app and sync it up with the remaining blocks
func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint, testValidatorsChange bool) {
var chain []*types.Block
var commits []*types.Commit
var store *mockBlockStore
var stateDB dbm.DB
var genesisState sm.State
func testHandshakeReplay(t *testing.T, nBlocks int, mode uint, testValidatorsChange bool) {
var (
chain []*types.Block
commits []*types.Commit
store *mockBlockStore
stateDB dbm.DB
genesisState sm.State
config *cfg.Config
)
if testValidatorsChange {
testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
defer os.RemoveAll(testConfig.RootDir)
config = ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
defer os.RemoveAll(config.RootDir)
stateDB = dbm.NewMemDB()
genesisState = sim.GenesisState
config = sim.Config
chain = append([]*types.Block{}, sim.Chain...) // copy chain
commits = sim.Commits
store = newMockBlockStore(t, config, genesisState.ConsensusParams)
} else { // test single node
testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
defer os.RemoveAll(testConfig.RootDir)
config = ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
defer os.RemoveAll(config.RootDir)
walBody, err := WALWithNBlocks(t, numBlocks)
require.NoError(t, err)
walFile := tempWALWithData(walBody)
@@ -811,14 +814,11 @@ func buildAppStateFromChain(t *testing.T, proxyApp proxy.AppConns, stateStore sm
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
validators := types.TM2PB.ValidatorUpdates(state.Validators)
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
_, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
Validators: validators,
}); err != nil {
panic(err)
}
if err := stateStore.Save(state); err != nil { // save height 1's validatorsInfo
panic(err)
}
})
require.NoError(t, err)
require.NoError(t, stateStore.Save(state))
switch mode {
case 0:
for i := 0; i < nBlocks; i++ {

View File

@@ -148,7 +148,6 @@ type StateOption func(*State)
// NewState returns a new State.
func NewState(
config *cfg.ConsensusConfig,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
@@ -177,13 +176,6 @@ func NewState(
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
}
cs.updateToState(state)
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(nil, "State", cs)
@@ -207,10 +199,19 @@ func (cs *State) SetEventBus(b *types.EventBus) {
}
// StateMetrics sets the metrics.
func StateMetrics(metrics *Metrics) StateOption {
func WithMetrics(metrics *Metrics) StateOption {
return func(cs *State) { cs.metrics = metrics }
}
func WithState(state sm.State) StateOption {
return func(cs *State) {
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
}
cs.updateToState(state)
}
}
// String returns a string.
func (cs *State) String() string {
// better not to access shared variables
@@ -297,6 +298,10 @@ func (cs *State) LoadCommit(height int64) *types.Commit {
// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
func (cs *State) OnStart() error {
if cs.state.IsEmpty() {
return errors.New("no state to commence consensus on")
}
// We may set the WAL in testing before calling Start, so only OpenWAL if its
// still the nilWAL.
if _, ok := cs.wal.(nilWAL); ok {
@@ -612,7 +617,7 @@ func (cs *State) updateToState(state sm.State) {
// signal the new round step, because other services (eg. txNotifier)
// depend on having an up-to-date peer state!
if state.LastBlockHeight <= cs.state.LastBlockHeight {
cs.Logger.Debug(
cs.Logger.Info(
"ignoring updateToState()",
"new_height", state.LastBlockHeight+1,
"old_height", cs.state.LastBlockHeight+1,
@@ -2275,11 +2280,10 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
return nil
}
// TODO: pass pubKey to signVote
vote, err := cs.signVote(msgType, hash, header)
if err == nil {
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
cs.Logger.Info("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
return vote
}

View File

@@ -121,7 +121,7 @@ func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerID p2p.ID) (added bool,
return
}
voteSet := hvs.getVoteSet(vote.Round, vote.Type)
if voteSet == nil {
if voteSet.IsEmpty() {
if rndz := hvs.peerCatchupRounds[peerID]; len(rndz) < 2 {
hvs.addRound(vote.Round)
voteSet = hvs.getVoteSet(vote.Round, vote.Type)
@@ -166,7 +166,7 @@ func (hvs *HeightVoteSet) POLInfo() (polRound int32, polBlockID types.BlockID) {
func (hvs *HeightVoteSet) getVoteSet(round int32, voteType tmproto.SignedMsgType) *types.VoteSet {
rvs, ok := hvs.roundVoteSets[round]
if !ok {
return nil
return &types.VoteSet{}
}
switch voteType {
case tmproto.PrevoteType:

View File

@@ -5,6 +5,7 @@ import (
"os"
"testing"
"github.com/stretchr/testify/require"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/internal/test"
@@ -30,30 +31,23 @@ func TestPeerCatchupRounds(t *testing.T) {
vote999_0 := makeVoteHR(t, 1, 0, 999, privVals)
added, err := hvs.AddVote(vote999_0, "peer1")
if !added || err != nil {
t.Error("Expected to successfully add vote from peer", added, err)
}
require.NoError(t, err)
require.True(t, added)
vote1000_0 := makeVoteHR(t, 1, 0, 1000, privVals)
added, err = hvs.AddVote(vote1000_0, "peer1")
if !added || err != nil {
t.Error("Expected to successfully add vote from peer", added, err)
}
require.NoError(t, err)
require.True(t, added)
vote1001_0 := makeVoteHR(t, 1, 0, 1001, privVals)
added, err = hvs.AddVote(vote1001_0, "peer1")
if err != ErrGotVoteFromUnwantedRound {
t.Errorf("expected GotVoteFromUnwantedRoundError, but got %v", err)
}
if added {
t.Error("Expected to *not* add vote from peer, too many catchup rounds.")
}
require.Error(t, err)
require.Equal(t, ErrGotVoteFromUnwantedRound, err)
require.False(t, added)
added, err = hvs.AddVote(vote1001_0, "peer2")
if !added || err != nil {
t.Error("Expected to successfully add vote from another peer")
}
require.NoError(t, err)
require.True(t, added)
}
func makeVoteHR(t *testing.T, height int64, valIndex, round int32, privVals []types.PrivValidator) *types.Vote {

View File

@@ -86,7 +86,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState := NewState(config.Consensus, blockExec, blockStore, mempool, evpool, WithState(state.Copy()))
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
if privValidator != nil {

View File

@@ -78,7 +78,6 @@ Note the context/background should be written in the present tense.
- [ADR-039: Peer-Behaviour](./adr-039-peer-behaviour.md)
- [ADR-063: Privval-gRPC](./adr-063-privval-grpc.md)
- [ADR-067: Mempool Refactor](./adr-067-mempool-refactor.md)
- [ADR-071: Proposer-Based Timestamps](./adr-071-proposer-based-timestamps.md)
- [ADR-075: RPC Event Subscription Interface](./adr-075-rpc-subscription.md)
- [ADR-079: Ed25519 Verification](./adr-079-ed25519-verification.md)
- [ADR-081: Protocol Buffers Management](./adr-081-protobuf-mgmt.md)
@@ -115,6 +114,7 @@ None
- [ADR-064: Batch Verification](./adr-064-batch-verification.md)
- [ADR-068: Reverse-Sync](./adr-068-reverse-sync.md)
- [ADR-069: Node Initialization](./adr-069-flexible-node-initialization.md)
- [ADR-071: Proposer-Based Timestamps](./adr-071-proposer-based-timestamps.md)
- [ADR-073: Adopt LibP2P](./adr-073-libp2p.md)
- [ADR-074: Migrate Timeout Parameters to Consensus Parameters](./adr-074-timeout-params.md)
- [ADR-080: Reverse Sync](./adr-080-reverse-sync.md)

View File

@@ -18,52 +18,50 @@ Listen address can be changed in the config file (see
The following metrics are available:
| **Name** | **Type** | **Tags** | **Description** |
|------------------------------------------|-----------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------------|
| `abci_connection_method_timing_seconds` | Histogram | `method`, `type` | Timings for each of the ABCI methods |
| `consensus_height` | Gauge | | Height of the chain |
| `consensus_validators` | Gauge | | Number of validators |
| `consensus_validators_power` | Gauge | | Total voting power of all validators |
| `consensus_validator_power` | Gauge | | Voting power of the node if in the validator set |
| `consensus_validator_last_signed_height` | Gauge | | Last height the node signed a block, if the node is a validator |
| `consensus_validator_missed_blocks` | Gauge | | Total amount of blocks missed for the node, if the node is a validator |
| `consensus_missing_validators` | Gauge | | Number of validators who did not sign |
| `consensus_missing_validators_power` | Gauge | | Total voting power of the missing validators |
| `consensus_byzantine_validators` | Gauge | | Number of validators who tried to double sign |
| `consensus_byzantine_validators_power` | Gauge | | Total voting power of the byzantine validators |
| `consensus_block_interval_seconds` | Histogram | | Time between this and last block (Block.Header.Time) in seconds |
| `consensus_rounds` | Gauge | | Number of rounds |
| `consensus_num_txs` | Gauge | | Number of transactions |
| `consensus_total_txs` | Gauge | | Total number of transactions committed |
| `consensus_block_parts` | Counter | `peer_id` | Number of blockparts transmitted by peer |
| `consensus_latest_block_height` | Gauge | | /status sync\_info number |
| `consensus_block_syncing` | Gauge | | Either 0 (not block syncing) or 1 (syncing) |
| `consensus_state_syncing` | Gauge | | Either 0 (not state syncing) or 1 (syncing) |
| `consensus_block_size_bytes` | Gauge | | Block size in bytes |
| `consensus_step_duration` | Histogram | `step` | Histogram of durations for each step in the consensus protocol |
| `consensus_round_duration` | Histogram | | Histogram of durations for all the rounds that have occurred since the process started |
| `consensus_block_gossip_parts_received` | Counter | `matches_current` | Number of block parts received by the node |
| `consensus_quorum_prevote_delay` | Gauge | | Interval in seconds between the proposal timestamp and the timestamp of the earliest prevote that achieved a quorum |
| `consensus_full_prevote_delay` | Gauge | | Interval in seconds between the proposal timestamp and the timestamp of the latest prevote in a round where all validators voted |
| `consensus_proposal_receive_count` | Counter | `status` | Total number of proposals received by the node since process start |
| `consensus_proposal_create_count` | Counter | | Total number of proposals created by the node since process start |
| `consensus_round_voting_power_percent` | Gauge | `vote_type` | A value between 0 and 1.0 representing the percentage of the total voting power per vote type received within a round |
| `consensus_late_votes` | Counter | `vote_type` | Number of votes received by the node since process start that correspond to earlier heights and rounds than this node is currently in. |
| `p2p_message_send_bytes_total` | Counter | `message_type` | Number of bytes sent to all peers per message type |
| `p2p_message_receive_bytes_total` | Counter | `message_type` | Number of bytes received from all peers per message type |
| `p2p_peers` | Gauge | | Number of peers node's connected to |
| `p2p_peer_receive_bytes_total` | Counter | `peer_id`, `chID` | Number of bytes per channel received from a given peer |
| `p2p_peer_send_bytes_total` | Counter | `peer_id`, `chID` | Number of bytes per channel sent to a given peer |
| `p2p_peer_pending_send_bytes` | Gauge | `peer_id` | Number of pending bytes to be sent to a given peer |
| `p2p_num_txs` | Gauge | `peer_id` | Number of transactions submitted by each peer\_id |
| `p2p_pending_send_bytes` | Gauge | `peer_id` | Amount of data pending to be sent to peer |
| `mempool_size` | Gauge | | Number of uncommitted transactions |
| `mempool_tx_size_bytes` | Histogram | | Transaction sizes in bytes |
| `mempool_failed_txs` | Counter | | Number of failed transactions |
| `mempool_recheck_times` | Counter | | Number of transactions rechecked in the mempool |
| `state_block_processing_time` | Histogram | | Time between BeginBlock and EndBlock in ms |
| `state_consensus_param_updates` | Counter | | Number of consensus parameter updates returned by the application since process start |
| `state_validator_set_updates` | Counter | | Number of validator set updates returned by the application since process start |
| **Name** | **Type** | **Tags** | **Description** |
|----------------------------------------|-----------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------|
| abci_connection_method_timing_seconds | Histogram | method, type | Timings for each of the ABCI methods |
| consensus_height | Gauge | | Height of the chain |
| consensus_validators | Gauge | | Number of validators |
| consensus_validators_power | Gauge | | Total voting power of all validators |
| consensus_validator_power | Gauge | | Voting power of the node if in the validator set |
| consensus_validator_last_signed_height | Gauge | | Last height the node signed a block, if the node is a validator |
| consensus_validator_missed_blocks | Gauge | | Total amount of blocks missed for the node, if the node is a validator |
| consensus_missing_validators | Gauge | | Number of validators who did not sign |
| consensus_missing_validators_power | Gauge | | Total voting power of the missing validators |
| consensus_byzantine_validators | Gauge | | Number of validators who tried to double sign |
| consensus_byzantine_validators_power | Gauge | | Total voting power of the byzantine validators |
| consensus_block_interval_seconds | Histogram | | Time between this and last block (Block.Header.Time) in seconds |
| consensus_rounds | Gauge | | Number of rounds |
| consensus_num_txs | Gauge | | Number of transactions |
| consensus_total_txs | Gauge | | Total number of transactions committed |
| consensus_block_parts | counter | peer_id | number of blockparts transmitted by peer |
| consensus_latest_block_height | gauge | | /status sync_info number |
| consensus_block_syncing | gauge | | either 0 (not block syncing) or 1 (syncing) |
| consensus_state_syncing | gauge | | either 0 (not state syncing) or 1 (syncing) |
| consensus_block_size_bytes | Gauge | | Block size in bytes |
| consensus_step_duration | Histogram | step | Histogram of durations for each step in the consensus protocol |
| consensus_round_duration | Histogram | | Histogram of durations for all the rounds that have occurred since the process started |
| consensus_block_gossip_parts_received | Counter | matches_current | Number of block parts received by the node |
| consensus_quorum_prevote_delay | Gauge | | Interval in seconds between the proposal timestamp and the timestamp of the earliest prevote that achieved a quorum |
| consensus_full_prevote_delay | Gauge | | Interval in seconds between the proposal timestamp and the timestamp of the latest prevote in a round where all validators voted |
| consensus_proposal_receive_count | Counter | status | Total number of proposals received by the node since process start |
| consensus_proposal_create_count | Counter | | Total number of proposals created by the node since process start |
| consensus_round_voting_power_percent | Gauge | vote_type | A value between 0 and 1.0 representing the percentage of the total voting power per vote type received within a round |
| consensus_late_votes | Counter | vote_type | Number of votes received by the node since process start that correspond to earlier heights and rounds than this node is currently in. |
| p2p_peers | Gauge | | Number of peers node's connected to |
| p2p_peer_receive_bytes_total | counter | peer_id, chID | number of bytes per channel received from a given peer |
| p2p_peer_send_bytes_total | counter | peer_id, chID | number of bytes per channel sent to a given peer |
| p2p_peer_pending_send_bytes | gauge | peer_id | number of pending bytes to be sent to a given peer |
| p2p_num_txs | gauge | peer_id | number of transactions submitted by each peer_id |
| p2p_pending_send_bytes | gauge | peer_id | amount of data pending to be sent to peer |
| mempool_size | Gauge | | Number of uncommitted transactions |
| mempool_tx_size_bytes | histogram | | transaction sizes in bytes |
| mempool_failed_txs | counter | | number of failed transactions |
| mempool_recheck_times | counter | | number of transactions rechecked in the mempool |
| state_block_processing_time | histogram | | time between BeginBlock and EndBlock in ms |
| state_consensus_param_updates | Counter | | number of consensus parameter updates returned by the application since process start |
| state_validator_set_updates | Counter | | number of validator set updates returned by the application since process start |
## Useful queries

14
go.mod
View File

@@ -21,7 +21,7 @@ require (
github.com/ory/dockertest v3.3.5+incompatible
github.com/pkg/errors v0.9.1
github.com/pointlander/peg v1.0.1
github.com/prometheus/client_golang v1.13.1
github.com/prometheus/client_golang v1.13.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.37.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
@@ -29,7 +29,7 @@ require (
github.com/sasha-s/go-deadlock v0.3.1
github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.14.0
github.com/spf13/viper v1.13.0
github.com/stretchr/testify v1.8.1
github.com/tendermint/tm-db v0.6.6
golang.org/x/crypto v0.1.0
@@ -47,7 +47,7 @@ require (
github.com/btcsuite/btcd/btcec/v2 v2.3.0
github.com/btcsuite/btcd/btcutil v1.1.2
github.com/cosmos/gogoproto v1.4.2
github.com/gofrs/uuid v4.3.1+incompatible
github.com/gofrs/uuid v4.3.0+incompatible
github.com/google/uuid v1.3.0
github.com/oasisprotocol/curve25519-voi v0.0.0-20220708102147-0a8a51822cae
github.com/vektra/mockery/v2 v2.14.1
@@ -107,7 +107,7 @@ require (
github.com/fatih/color v1.13.0 // indirect
github.com/fatih/structtag v1.2.0 // indirect
github.com/firefart/nonamedreturns v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/fzipp/gocyclo v0.6.0 // indirect
github.com/go-chi/chi/v5 v5.0.7 // indirect
github.com/go-critic/go-critic v0.6.5 // indirect
@@ -222,7 +222,7 @@ require (
github.com/sivchari/tenv v1.7.0 // indirect
github.com/sonatard/noctx v0.0.1 // indirect
github.com/sourcegraph/go-diff v0.6.1 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
@@ -255,12 +255,12 @@ require (
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/exp/typeparams v0.0.0-20220827204233-334a2380cb91 // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/term v0.1.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/tools v0.2.0 // indirect
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e // indirect
google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

39
go.sum
View File

@@ -23,15 +23,14 @@ cloud.google.com/go v0.75.0/go.mod h1:VGuuCn7PG0dwsd5XPVm2Mm3wlh3EL55/79EKB6hlPT
cloud.google.com/go v0.78.0/go.mod h1:QjdrLG0uq+YwhjoVOLsS1t7TW8fs36kLs4XO5R5ECHg=
cloud.google.com/go v0.79.0/go.mod h1:3bzgcEeQlzbuEAYu4mrWhKqWjmpprinYgKJLgKHnbb8=
cloud.google.com/go v0.81.0/go.mod h1:mk/AM35KwGk/Nm2YSeZbxXdrNK3KZOYHmLkOqC2V6E0=
cloud.google.com/go v0.104.0 h1:gSmWO7DY1vOm0MVU6DNXM11BWHHsTUmsC5cv1fuW5X8=
cloud.google.com/go v0.100.2 h1:t9Iw5QH5v4XtlEQaCtUY7x6sCABps8sW0acw7e2WQ6Y=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/compute v1.12.1 h1:gKVJMEyqV5c/UnpzjjQbo3Rjvvqpr9B1DFSbJC4OXr0=
cloud.google.com/go/compute/metadata v0.2.1 h1:efOwf5ymceDhK6PKMnnrTHP4pppY5L22mle96M1yP48=
cloud.google.com/go/compute v1.6.1 h1:2sMmt8prCn7DPaG4Pmh0N3Inmc8cT8ae5k1M6VJ9Wqc=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
@@ -337,8 +336,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/fzipp/gocyclo v0.6.0 h1:lsblElZG7d3ALtGMx9fmxeTKZaLLpU8mET09yN4BBLo=
github.com/fzipp/gocyclo v0.6.0/go.mod h1:rXPyn8fnlpa0R2csP/31uerbiVBugk5whMdlyaLkLoA=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -399,8 +398,8 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x
github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/uuid v4.3.1+incompatible h1:0/KbAdpx3UXAx1kEOWHJeOkpbgRFGHVgv+CFIY7dBJI=
github.com/gofrs/uuid v4.3.1+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v4.3.0+incompatible h1:CaSVZxm5B+7o45rtab4jC2G37WGYX1zQfuU2i6DSvnc=
github.com/gofrs/uuid v4.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -913,8 +912,8 @@ github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP
github.com/prometheus/client_golang v1.8.0/go.mod h1:O9VU6huf47PktckDQfMTX0Y8tY0/7TSWwj+ITvv0TnM=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_golang v1.13.1 h1:3gMjIY2+/hzmqhtUC/aQNYldJA6DtH3CgQvwS+02K1c=
github.com/prometheus/client_golang v1.13.1/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ=
github.com/prometheus/client_golang v1.13.0 h1:b71QUfeo5M8gq2+evJdTPfZhYMAU0uKPkyPJ7TPsloU=
github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -1035,8 +1034,8 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw=
github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/afero v1.8.2 h1:xehSyVa0YnHWsJ49JFljMpg1HX19V6NDZ1fkm1Xznbo=
github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
@@ -1060,8 +1059,8 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/spf13/viper v1.14.0 h1:Rg7d3Lo706X9tHsJMUjdiwMpHB7W8WnSVOssIY+JElU=
github.com/spf13/viper v1.14.0/go.mod h1:WT//axPky3FdvXHzGw33dNdXXXfFQqmEalje+egj8As=
github.com/spf13/viper v1.13.0 h1:BWSJ/M+f+3nmdz9bxB+bWX28kkALN2ok11D0rSo8EJU=
github.com/spf13/viper v1.13.0/go.mod h1:Icm2xNL3/8uyh/wFuB1jI7TiTNKp8632Nwegu+zgdYw=
github.com/ssgreg/nlreturn/v2 v2.2.1 h1:X4XDI7jstt3ySqGU86YGAURbxw3oTDPK9sPEi6YEwQ0=
github.com/ssgreg/nlreturn/v2 v2.2.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I=
github.com/stbenjam/no-sprintf-host-port v0.1.1 h1:tYugd/yrm1O0dV+ThCbaKZh195Dfm07ysF0U6JQXczc=
@@ -1350,7 +1349,7 @@ golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk=
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 h1:OSnWWcOd/CtWQC2cYSBgbTSJv3ciqd8r54ySIW2y3RE=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -1363,8 +1362,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 h1:cu5kTvlzcw1Q5S9f5ip1/cpiB4nXvw1XYzFPGgzLUOY=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -1459,12 +1458,12 @@ golang.org/x/sys v0.0.0-20211105183446-c75c47738b0c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220702020025-31831981b65f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
@@ -1488,8 +1487,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -1674,8 +1673,8 @@ google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaE
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20211101144312-62acf1d99145/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e h1:S9GbmC1iCgvbLyAokVCwiO6tVIrU9Y7c5oMx1V/ki/Y=
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e/go.mod h1:9qHF0xnpdSfF6knlcsnpzUu5y+rpwgbvsyGAZPBMg4s=
google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a h1:GH6UPn3ixhWcKDhpnEC55S75cerLPdpp3hrhfKYjZgw=
google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a/go.mod h1:1vXfmgAz9N9Jx0QA82PqRVauvCz1SGSz739p0f183jM=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=

View File

@@ -57,7 +57,6 @@ func TestLightClientAttackEvidence_Lunatic(t *testing.T) {
dbs.New(dbm.NewMemDB(), chainID),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
light.SkippingVerification(minimumTrustLevel),
)
require.NoError(t, err)
@@ -93,7 +92,7 @@ func TestLightClientAttackEvidence_Lunatic(t *testing.T) {
func TestLightClientAttackEvidence_Equivocation(t *testing.T) {
verificationOptions := map[string]light.Option{
"sequential": light.SequentialVerification(),
"skipping": light.SkippingVerification(minimumTrustLevel),
"skipping": light.SkippingVerification(light.DefaultTrustLevel),
}
for s, verificationOption := range verificationOptions {
@@ -231,7 +230,6 @@ func TestLightClientAttackEvidence_ForwardLunatic(t *testing.T) {
light.Logger(log.TestingLogger()),
light.MaxClockDrift(1*time.Second),
light.MaxBlockLag(1*time.Second),
light.SkippingVerification(minimumTrustLevel),
)
require.NoError(t, err)
@@ -299,7 +297,6 @@ func TestLightClientAttackEvidence_ForwardLunatic(t *testing.T) {
light.Logger(log.TestingLogger()),
light.MaxClockDrift(1*time.Second),
light.MaxBlockLag(1*time.Second),
light.SkippingVerification(minimumTrustLevel),
)
require.NoError(t, err)
@@ -330,7 +327,6 @@ func TestClientDivergentTraces1(t *testing.T) {
dbs.New(dbm.NewMemDB(), chainID),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
light.SkippingVerification(minimumTrustLevel),
)
require.Error(t, err)
assert.Contains(t, err.Error(), "does not match primary")
@@ -355,7 +351,6 @@ func TestClientDivergentTraces2(t *testing.T) {
dbs.New(dbm.NewMemDB(), chainID),
light.Logger(log.TestingLogger()),
light.MaxRetryAttempts(1),
light.SkippingVerification(minimumTrustLevel),
)
require.NoError(t, err)
@@ -427,7 +422,6 @@ func TestClientDivergentTraces4(t *testing.T) {
[]provider.Provider{witness},
dbs.New(dbm.NewMemDB(), chainID),
light.Logger(log.TestingLogger()),
light.SkippingVerification(minimumTrustLevel),
)
require.NoError(t, err)

View File

@@ -21,22 +21,22 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
"health": rpcserver.NewRPCFunc(makeHealthFunc(c), ""),
"status": rpcserver.NewRPCFunc(makeStatusFunc(c), ""),
"net_info": rpcserver.NewRPCFunc(makeNetInfoFunc(c), ""),
"blockchain": rpcserver.NewRPCFunc(makeBlockchainInfoFunc(c), "minHeight,maxHeight", rpcserver.Cacheable()),
"genesis": rpcserver.NewRPCFunc(makeGenesisFunc(c), "", rpcserver.Cacheable()),
"genesis_chunked": rpcserver.NewRPCFunc(makeGenesisChunkedFunc(c), "", rpcserver.Cacheable()),
"block": rpcserver.NewRPCFunc(makeBlockFunc(c), "height", rpcserver.Cacheable("height")),
"header": rpcserver.NewRPCFunc(makeHeaderFunc(c), "height", rpcserver.Cacheable("height")),
"header_by_hash": rpcserver.NewRPCFunc(makeHeaderByHashFunc(c), "hash", rpcserver.Cacheable()),
"block_by_hash": rpcserver.NewRPCFunc(makeBlockByHashFunc(c), "hash", rpcserver.Cacheable()),
"block_results": rpcserver.NewRPCFunc(makeBlockResultsFunc(c), "height", rpcserver.Cacheable("height")),
"commit": rpcserver.NewRPCFunc(makeCommitFunc(c), "height", rpcserver.Cacheable("height")),
"tx": rpcserver.NewRPCFunc(makeTxFunc(c), "hash,prove", rpcserver.Cacheable()),
"blockchain": rpcserver.NewRPCFunc(makeBlockchainInfoFunc(c), "minHeight,maxHeight"),
"genesis": rpcserver.NewRPCFunc(makeGenesisFunc(c), ""),
"genesis_chunked": rpcserver.NewRPCFunc(makeGenesisChunkedFunc(c), ""),
"block": rpcserver.NewRPCFunc(makeBlockFunc(c), "height"),
"header": rpcserver.NewRPCFunc(makeHeaderFunc(c), "height"),
"header_by_hash": rpcserver.NewRPCFunc(makeHeaderByHashFunc(c), "hash"),
"block_by_hash": rpcserver.NewRPCFunc(makeBlockByHashFunc(c), "hash"),
"block_results": rpcserver.NewRPCFunc(makeBlockResultsFunc(c), "height"),
"commit": rpcserver.NewRPCFunc(makeCommitFunc(c), "height"),
"tx": rpcserver.NewRPCFunc(makeTxFunc(c), "hash,prove"),
"tx_search": rpcserver.NewRPCFunc(makeTxSearchFunc(c), "query,prove,page,per_page,order_by"),
"block_search": rpcserver.NewRPCFunc(makeBlockSearchFunc(c), "query,page,per_page,order_by"),
"validators": rpcserver.NewRPCFunc(makeValidatorsFunc(c), "height,page,per_page", rpcserver.Cacheable("height")),
"validators": rpcserver.NewRPCFunc(makeValidatorsFunc(c), "height,page,per_page"),
"dump_consensus_state": rpcserver.NewRPCFunc(makeDumpConsensusStateFunc(c), ""),
"consensus_state": rpcserver.NewRPCFunc(makeConsensusStateFunc(c), ""),
"consensus_params": rpcserver.NewRPCFunc(makeConsensusParamsFunc(c), "height", rpcserver.Cacheable("height")),
"consensus_params": rpcserver.NewRPCFunc(makeConsensusParamsFunc(c), "height"),
"unconfirmed_txs": rpcserver.NewRPCFunc(makeUnconfirmedTxsFunc(c), "limit"),
"num_unconfirmed_txs": rpcserver.NewRPCFunc(makeNumUnconfirmedTxsFunc(c), ""),
@@ -47,7 +47,7 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
// abci API
"abci_query": rpcserver.NewRPCFunc(makeABCIQueryFunc(c), "path,data,height,prove"),
"abci_info": rpcserver.NewRPCFunc(makeABCIInfoFunc(c), "", rpcserver.Cacheable()),
"abci_info": rpcserver.NewRPCFunc(makeABCIInfoFunc(c), ""),
// evidence API
"broadcast_evidence": rpcserver.NewRPCFunc(makeBroadcastEvidenceFunc(c), "evidence"),

View File

@@ -10,30 +10,22 @@ import (
"github.com/tendermint/tendermint/types"
)
// DefaultTrustLevel - We default to assuming that 2/3 in voting power of the validator set must be in common
// between the trusted and untrusted validator set. Guarantees at least 1/3 honest validators signed.
//
// Trust level must at minimum be greater than 1/3 and less than or equal to 1. Assuming that
// 1/3 or less of the validator set are byzantine (Tendermint's standard security guarantee), this equates
// to at least 1 honest node that we can trust in the untrusted signer set.
//
// Practically speaking, a trust level greater than 2/3 does not provide signifcantly greater
// security as a plethora of other attack vectors become apparent at that level of byzantine.
// The lower the trust level the less amount of intermediary verification steps need to be taken
// between a trusted header and untrusted header. This is also depedent on how much a validator
// set changes. In chains with low turnover, 2/3 should be a good balance of speed and security.
var DefaultTrustLevel = tmmath.Fraction{Numerator: 2, Denominator: 3}
var (
// DefaultTrustLevel - new header can be trusted if at least one correct
// validator signed it.
DefaultTrustLevel = tmmath.Fraction{Numerator: 1, Denominator: 3}
)
// VerifyNonAdjacent verifies non-adjacent untrustedHeader against
// trustedHeader. It ensures that:
//
// a) trustedHeader can still be trusted (if not, ErrOldHeaderExpired is returned)
// b) untrustedHeader is valid (if not, ErrInvalidHeader is returned)
// c) trustLevel ([1/3, 2/3]) of trustedHeaderVals (or trustedHeaderNextVals)
// c) trustLevel ([1/3, 1]) of trustedHeaderVals (or trustedHeaderNextVals)
// signed correctly (if not, ErrNewValSetCantBeTrusted is returned)
// d) more than 2/3 of untrustedVals have signed h2
// (otherwise, ErrInvalidHeader is returned)
// e) headers are non-adjacent.
// e) headers are non-adjacent.
//
// maxClockDrift defines how much untrustedHeader.Time can drift into the
// future.

View File

@@ -12,9 +12,9 @@ import (
"github.com/tendermint/tendermint/types"
)
const maxClockDrift = 10 * time.Second
var minimumTrustLevel = tmmath.Fraction{Numerator: 1, Denominator: 3}
const (
maxClockDrift = 10 * time.Second
)
func TestVerifyAdjacentHeaders(t *testing.T) {
const (
@@ -271,7 +271,7 @@ func TestVerifyNonAdjacentHeaders(t *testing.T) {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
err := light.VerifyNonAdjacent(header, vals, tc.newHeader, tc.newVals, tc.trustingPeriod,
tc.now, maxClockDrift,
minimumTrustLevel)
light.DefaultTrustLevel)
switch {
case tc.expErr != nil && assert.Error(t, err):

View File

@@ -11,7 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
bc "github.com/tendermint/tendermint/blocksync"
"github.com/tendermint/tendermint/blocksync"
cfg "github.com/tendermint/tendermint/config"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/evidence"
@@ -50,7 +50,6 @@ type Node struct {
privValidator types.PrivValidator // local node's validator key
// network
transport *p2p.MultiplexTransport
sw *p2p.Switch // p2p connections
addrBook pex.AddrBook // known peers
nodeInfo p2p.NodeInfo
@@ -60,11 +59,10 @@ type Node struct {
// services
eventBus *types.EventBus // pub/sub for services
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor p2p.Reactor // for block-syncing
mempoolReactor p2p.Reactor // for gossipping transactions
blockStore *store.BlockStore // store the blockchain to disk
bcReactor *blocksync.Reactor // for block-syncing
mempoolReactor p2p.Reactor // for gossipping transactions
mempool mempl.Mempool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node
stateSyncGenesis sm.State // provides the genesis state for state sync
@@ -79,6 +77,7 @@ type Node struct {
indexerService *txindex.IndexerService
prometheusSrv *http.Server
pprofSrv *http.Server
customReactors map[string]p2p.Reactor
}
// Option sets a parameter for the node.
@@ -98,29 +97,7 @@ type Option func(*Node)
// - STATESYNC
func CustomReactors(reactors map[string]p2p.Reactor) Option {
return func(n *Node) {
for name, reactor := range reactors {
if existingReactor := n.sw.Reactor(name); existingReactor != nil {
n.sw.Logger.Info("Replacing existing reactor with a custom one",
"name", name, "existing", existingReactor, "custom", reactor)
n.sw.RemoveReactor(name, existingReactor)
}
n.sw.AddReactor(name, reactor)
// register the new channels to the nodeInfo
// NOTE: This is a bit messy now with the type casting but is
// cleaned up in the following version when NodeInfo is changed from
// and interface to a concrete type
if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok {
for _, chDesc := range reactor.GetChannels() {
if !ni.HasChannel(chDesc.ID) {
ni.Channels = append(ni.Channels, chDesc.ID)
n.transport.AddChannel(chDesc.ID)
}
}
n.nodeInfo = ni
} else {
n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.")
}
}
n.customReactors = reactors
}
}
@@ -160,74 +137,26 @@ func NewNode(config *cfg.Config,
return nil, err
}
csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID)
csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics, bsMetrics, ssMetrics := metricsProvider(genDoc.ChainID)
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics)
proxyApp, err := createProxyAppConns(clientCreator, logger, abciMetrics)
if err != nil {
return nil, err
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs, or, endblocker panicked)
eventBus, err := createAndStartEventBus(logger)
if err != nil {
return nil, err
}
eventBus := createEventBus(logger)
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config,
indexerService, txIndexer, blockIndexer, err := createIndexerService(config,
genDoc.ChainID, dbProvider, eventBus, logger)
if err != nil {
return nil, err
}
// If an address is provided, listen on the socket for a connection from an
// external signing process.
if config.PrivValidatorListenAddr != "" {
// FIXME: we should start services inside OnStart
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger)
if err != nil {
return nil, fmt.Errorf("error with private validator socket client: %w", err)
}
}
pubKey, err := privValidator.GetPubKey()
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}
// Determine whether we should attempt state sync.
stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
}
// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state, err = stateStore.Load()
if err != nil {
return nil, fmt.Errorf("cannot load state: %w", err)
}
}
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := config.BlockSyncMode && !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, logger, consensusLogger)
// Make MempoolReactor
mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)
@@ -249,21 +178,16 @@ func NewNode(config *cfg.Config,
)
// Make BlocksyncReactor. Don't start block sync if we're doing a state sync first.
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, bsMetrics, logger)
if err != nil {
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
if stateSync {
csMetrics.StateSyncing.Set(1)
} else if blockSync {
csMetrics.BlockSyncing.Set(1)
}
consensusLogger := logger.With("module", "consensus")
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger,
config, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, eventBus, consensusLogger,
)
// Set up state sync reactor, and schedule a sync if requested.
@@ -275,22 +199,15 @@ func NewNode(config *cfg.Config,
proxyApp.Snapshot(),
proxyApp.Query(),
config.StateSync.TempDir,
ssMetrics,
)
stateSyncReactor.SetLogger(logger.With("module", "statesync"))
nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
if err != nil {
return nil, err
}
// Setup Transport.
transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp)
// Setup Switch.
p2pLogger := logger.With("module", "p2p")
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactor, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
config, p2pMetrics, mempoolReactor, bcReactor,
stateSyncReactor, consensusReactor, evidenceReactor, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
@@ -303,7 +220,7 @@ func NewNode(config *cfg.Config,
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
addrBook, err := createAddrBook(config, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
@@ -322,7 +239,8 @@ func NewNode(config *cfg.Config,
// Note we currently use the addrBook regardless at least for AddOurAddress
var pexReactor *pex.Reactor
if config.P2P.PexReactor {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
pexReactor = createPEXReactor(addrBook, config, logger)
sw.AddReactor("PEX", pexReactor)
}
// Add private IDs to addrbook to block those peers being added
@@ -333,11 +251,9 @@ func NewNode(config *cfg.Config,
genesisDoc: genDoc,
privValidator: privValidator,
transport: transport,
sw: sw,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
sw: sw,
addrBook: addrBook,
nodeKey: nodeKey,
stateStore: stateStore,
blockStore: blockStore,
@@ -347,7 +263,6 @@ func NewNode(config *cfg.Config,
consensusState: consensusState,
consensusReactor: consensusReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
pexReactor: pexReactor,
evidencePool: evidencePool,
@@ -363,6 +278,15 @@ func NewNode(config *cfg.Config,
option(node)
}
for name, reactor := range node.customReactors {
if existingReactor := node.sw.Reactor(name); existingReactor != nil {
node.sw.Logger.Info("Replacing existing reactor with a custom one",
"name", name, "existing", existingReactor, "custom", reactor)
node.sw.RemoveReactor(name, existingReactor)
}
node.sw.AddReactor(name, reactor)
}
return node, nil
}
@@ -375,6 +299,77 @@ func (n *Node) OnStart() error {
time.Sleep(genTime.Sub(now))
}
// If an address is provided, listen on the socket for a connection from an
// external signing process. This will overwrite the privvalidator provided in the constructor
if n.config.PrivValidatorListenAddr != "" {
var err error
n.privValidator, err = createPrivValidatorSocketClient(n.config.PrivValidatorListenAddr, n.genesisDoc.ChainID, n.Logger)
if err != nil {
return fmt.Errorf("error with private validator socket client: %w", err)
}
}
pubKey, err := n.privValidator.GetPubKey()
if err != nil {
return fmt.Errorf("can't get pubkey: %w", err)
}
state, err := n.stateStore.LoadFromDBOrGenesisDoc(n.genesisDoc)
if err != nil {
return fmt.Errorf("cannot load state: %w", err)
}
// Determine whether we should attempt state sync.
stateSync := n.config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
n.Logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
if !stateSync {
if err := doHandshake(n.stateStore, state, n.blockStore, n.genesisDoc, n.eventBus, n.proxyApp, n.Logger); err != nil {
return err
}
// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state, err = n.stateStore.Load()
if err != nil {
return fmt.Errorf("cannot load state: %w", err)
}
}
nodeInfo, err := makeNodeInfo(n.config, n.nodeKey, n.txIndexer, n.genesisDoc, state)
if err != nil {
return err
}
for _, reactor := range n.customReactors {
for _, chDesc := range reactor.GetChannels() {
if !nodeInfo.HasChannel(chDesc.ID) {
nodeInfo.Channels = append(nodeInfo.Channels, chDesc.ID)
}
}
}
n.nodeInfo = nodeInfo
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress))
if err != nil {
return err
}
// Setup Transport.
transport, peerFilters := createTransport(n.config, n.nodeInfo, n.nodeKey, addr, n.proxyApp)
n.sw.SetTransport(transport)
n.sw.SetPeerFilters(peerFilters...)
n.sw.SetNodeInfo(n.nodeInfo)
n.sw.SetNodeKey(n.nodeKey)
// run pprof server if it is enabled
if n.config.RPC.IsPprofEnabled() {
n.pprofSrv = n.startPprofServer()
@@ -395,16 +390,13 @@ func (n *Node) OnStart() error {
n.rpcListeners = listeners
}
// Start the transport.
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(*addr); err != nil {
if err := n.eventBus.Start(); err != nil {
return err
}
n.isListening = true
if err := n.indexerService.Start(); err != nil {
return err
}
// Start the switch (the P2P server).
err = n.sw.Start()
@@ -412,23 +404,37 @@ func (n *Node) OnStart() error {
return err
}
n.isListening = true
// Always connect to persistent peers
err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
return fmt.Errorf("could not dial peers from persistent_peers field: %w", err)
}
// Run state sync
if n.stateSync {
bcR, ok := n.bcReactor.(blockSyncReactor)
if !ok {
return fmt.Errorf("this blocksync reactor does not support switching from state sync")
}
err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := n.config.BlockSyncMode && !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, n.Logger)
// Run start up phases
if stateSync {
err := startStateSync(n.stateSyncReactor, n.bcReactor, n.consensusReactor, n.stateSyncProvider,
n.config.StateSync, n.config.BlockSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis)
if err != nil {
return fmt.Errorf("failed to start state sync: %w", err)
}
} else if blockSync {
err := n.bcReactor.SwitchToBlockSync(state)
if err != nil {
return fmt.Errorf("failed to start block sync: %w", err)
}
} else {
err := n.consensusReactor.SwitchToConsensus(state, false)
if err != nil {
return fmt.Errorf("failed to switch to consensus: %w", err)
}
}
return nil
@@ -453,10 +459,6 @@ func (n *Node) OnStop() {
n.Logger.Error("Error closing switch", "err", err)
}
if err := n.transport.Close(); err != nil {
n.Logger.Error("Error closing transport", "err", err)
}
n.isListening = false
// finally stop the listeners / external services
@@ -518,6 +520,8 @@ func (n *Node) ConfigureRPC() error {
TxIndexer: n.txIndexer,
BlockIndexer: n.blockIndexer,
ConsensusReactor: n.consensusReactor,
BlocksyncReactor: n.bcReactor,
StatesyncReactor: n.stateSyncReactor,
EventBus: n.eventBus,
Mempool: n.mempool,
@@ -651,7 +655,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
}
// startPrometheusServer starts a Prometheus HTTP server, listening for metrics
// collectors on addr.
// collectors on the provided address.
func (n *Node) startPrometheusServer() *http.Server {
srv := &http.Server{
Addr: n.config.Instrumentation.PrometheusListenAddr,
@@ -672,7 +676,7 @@ func (n *Node) startPrometheusServer() *http.Server {
return srv
}
// starts a ppro
// starts a pprof server at the specified listen address
func (n *Node) startPprofServer() *http.Server {
srv := &http.Server{
Addr: n.config.RPC.PprofListenAddress,
@@ -682,7 +686,7 @@ func (n *Node) startPprofServer() *http.Server {
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
n.Logger.Error("pprof HTTP server ListenAndServe", "err", err)
n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
}
}()
return srv
@@ -793,7 +797,7 @@ func makeNodeInfo(
Network: genDoc.ChainID,
Version: version.TMCoreSemVer,
Channels: []byte{
bc.BlocksyncChannel,
blocksync.BlocksyncChannel,
cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
mempl.MempoolChannel,
evidence.EvidenceChannel,

View File

@@ -123,6 +123,8 @@ func TestNodeSetAppVersion(t *testing.T) {
// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
require.NoError(t, n.Start())
defer n.Stop() //nolint:errcheck
// default config uses the kvstore app
var appVersion = kvstore.ProtocolVersion
@@ -189,6 +191,8 @@ func TestNodeSetPrivValTCP(t *testing.T) {
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
require.NoError(t, n.Start())
defer n.Stop() //nolint:errcheck
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
}
@@ -201,7 +205,7 @@ func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
config.BaseConfig.PrivValidatorListenAddr = addrNoPrefix
_, err := DefaultNewNode(config, log.TestingLogger())
assert.Error(t, err)
require.NoError(t, err)
}
func TestNodeSetPrivValIPC(t *testing.T) {
@@ -451,7 +455,6 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
RecvMessageCapacity: 100,
},
}
customBlocksyncReactor := p2pmock.NewReactor()
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)
@@ -464,7 +467,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
log.TestingLogger(),
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKSYNC": customBlocksyncReactor}),
CustomReactors(map[string]p2p.Reactor{"FOO": cr}),
)
require.NoError(t, err)
@@ -475,9 +478,6 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
assert.True(t, cr.IsRunning())
assert.Equal(t, cr, n.Switch().Reactor("FOO"))
assert.True(t, customBlocksyncReactor.IsRunning())
assert.Equal(t, customBlocksyncReactor, n.Switch().Reactor("BLOCKSYNC"))
channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
assert.Contains(t, channels, mempl.MempoolChannel)
assert.Contains(t, channels, cr.Channels[0].ID)

View File

@@ -12,7 +12,7 @@ import (
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
bc "github.com/tendermint/tendermint/blocksync"
"github.com/tendermint/tendermint/blocksync"
cfg "github.com/tendermint/tendermint/config"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
@@ -99,20 +99,22 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
}
// MetricsProvider returns a consensus, p2p and mempool Metrics.
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics)
// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID)
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID),
blocksync.PrometheusMetrics(config.Namespace, "chain_id", chainID),
statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID)
}
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics(), blocksync.NopMetrics(), statesync.NopMetrics()
}
}
@@ -138,7 +140,7 @@ func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.Block
return
}
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
func createProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator, metrics)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
@@ -147,16 +149,13 @@ func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.L
return proxyApp, nil
}
func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
func createEventBus(logger log.Logger) *types.EventBus {
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, err
}
return eventBus, nil
return eventBus
}
func createAndStartIndexerService(
func createIndexerService(
config *cfg.Config,
chainID string,
dbProvider DBProvider,
@@ -197,10 +196,6 @@ func createAndStartIndexerService(
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
indexerService.SetLogger(logger.With("module", "txindex"))
if err := indexerService.Start(); err != nil {
return nil, nil, nil, err
}
return indexerService, txIndexer, blockIndexer, nil
}
@@ -222,7 +217,7 @@ func doHandshake(
return nil
}
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger) {
// Log the version info.
logger.Info("Version info",
"tendermint_version", version.TMCoreSemVer,
@@ -243,9 +238,9 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
logger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
} else {
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
logger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
}
}
@@ -334,12 +329,12 @@ func createBlocksyncReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
blockSync bool,
metrics *blocksync.Metrics,
logger log.Logger,
) (bcReactor p2p.Reactor, err error) {
) (bcReactor *blocksync.Reactor, err error) {
switch config.BlockSync.Version {
case "v0":
bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync)
bcReactor = blocksync.NewReactor(state.Copy(), blockExec, blockStore, metrics)
case "v1", "v2":
return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version)
default:
@@ -351,31 +346,28 @@ func createBlocksyncReactor(config *cfg.Config,
}
func createConsensusReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mempool mempl.Mempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
waitSync bool,
eventBus *types.EventBus,
consensusLogger log.Logger,
) (*cs.Reactor, *cs.State) {
consensusState := cs.NewState(
config.Consensus,
state.Copy(),
blockExec,
blockStore,
mempool,
evidencePool,
cs.StateMetrics(csMetrics),
cs.WithMetrics(csMetrics),
)
consensusState.SetLogger(consensusLogger)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}
consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics))
consensusReactor := cs.NewReactor(consensusState, cs.ReactorMetrics(csMetrics))
consensusReactor.SetLogger(consensusLogger)
// services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor
@@ -387,6 +379,7 @@ func createTransport(
config *cfg.Config,
nodeInfo p2p.NodeInfo,
nodeKey *p2p.NodeKey,
netAddr *p2p.NetAddress,
proxyApp proxy.AppConns,
) (
*p2p.MultiplexTransport,
@@ -394,7 +387,7 @@ func createTransport(
) {
var (
mConnConfig = p2p.MConnConfig(config.P2P)
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, *netAddr, mConnConfig)
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)
@@ -453,23 +446,17 @@ func createTransport(
}
func createSwitch(config *cfg.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
peerFilters []p2p.PeerFilterFunc,
mempoolReactor p2p.Reactor,
bcReactor p2p.Reactor,
stateSyncReactor *statesync.Reactor,
consensusReactor *cs.Reactor,
evidenceReactor *evidence.Reactor,
nodeInfo p2p.NodeInfo,
nodeKey *p2p.NodeKey,
p2pLogger log.Logger,
) *p2p.Switch {
sw := p2p.NewSwitch(
config.P2P,
transport,
p2p.WithMetrics(p2pMetrics),
p2p.SwitchPeerFilters(peerFilters...),
)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
@@ -477,15 +464,10 @@ func createSwitch(config *cfg.Config,
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
return sw
}
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
func createAddrBook(config *cfg.Config,
p2pLogger log.Logger, nodeKey *p2p.NodeKey,
) (pex.AddrBook, error) {
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
@@ -506,15 +488,10 @@ func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
}
addrBook.AddOurAddress(addr)
}
sw.SetAddrBook(addrBook)
return addrBook, nil
}
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
sw *p2p.Switch, logger log.Logger,
) *pex.Reactor {
func createPEXReactor(addrBook pex.AddrBook, config *cfg.Config, logger log.Logger) *pex.Reactor {
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewReactor(addrBook,
&pex.ReactorConfig{
@@ -529,7 +506,6 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
})
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
return pexReactor
}
@@ -575,16 +551,17 @@ func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.React
}
if blockSync {
// FIXME Very ugly to have these metrics bleed through here.
conR.Metrics.StateSyncing.Set(0)
conR.Metrics.BlockSyncing.Set(1)
err = bcR.SwitchToBlockSync(state)
if err != nil {
ssR.Logger.Error("Failed to switch to block sync", "err", err)
return
}
} else {
conR.SwitchToConsensus(state, true)
err := conR.SwitchToConsensus(state, true)
if err != nil {
ssR.Logger.Error("Failed to switch to consensus", "err", err)
return
}
}
}()
return nil
@@ -659,7 +636,7 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error {
return nil
}
func createAndStartPrivValidatorSocketClient(
func createPrivValidatorSocketClient(
listenAddr,
chainID string,
logger log.Logger,
@@ -674,12 +651,6 @@ func createAndStartPrivValidatorSocketClient(
return nil, fmt.Errorf("failed to start private validator: %w", err)
}
// try to get a pubkey from private validate first time
_, err = pvsc.GetPubKey()
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}
const (
retries = 50 // 50 * 100ms = 5s total
timeout = 100 * time.Millisecond

View File

@@ -1,6 +1,7 @@
package p2p
import (
"errors"
"fmt"
"math"
"sync"
@@ -107,7 +108,6 @@ type SwitchOption func(*Switch)
// NewSwitch creates a new Switch with the given config.
func NewSwitch(
cfg *config.P2PConfig,
transport Transport,
options ...SwitchOption,
) *Switch {
@@ -121,7 +121,6 @@ func NewSwitch(
dialing: cmap.NewCMap(),
reconnecting: cmap.NewCMap(),
metrics: NopMetrics(),
transport: transport,
filterTimeout: defaultFilterTimeout,
persistentPeersAddrs: make([]*NetAddress, 0),
unconditionalPeerIDs: make(map[ID]struct{}),
@@ -224,11 +223,30 @@ func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
sw.nodeKey = nodeKey
}
func (sw *Switch) SetPeerFilters(filters ...PeerFilterFunc) {
sw.peerFilters = filters
}
func (sw *Switch) SetTransport(transport Transport) {
if sw.IsRunning() {
panic("cannot set transport while switch is running")
}
sw.transport = transport
}
//---------------------------------------------------------------------
// Service start/stop
// OnStart implements BaseService. It starts all the reactors and peers.
func (sw *Switch) OnStart() error {
if sw.transport == nil {
return errors.New("transport not set")
}
if err := sw.transport.Start(); err != nil {
return err
}
// Start reactors
for _, reactor := range sw.reactors {
err := reactor.Start()
@@ -257,6 +275,10 @@ func (sw *Switch) OnStop() {
sw.Logger.Error("error while stopped reactor", "reactor", reactor, "error", err)
}
}
if err := sw.transport.Stop(); err != nil {
sw.Logger.Error("closing transport", "err", err)
}
}
//---------------------------------------------------------------------

View File

@@ -22,6 +22,7 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p/conn"
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
@@ -695,9 +696,16 @@ func TestSwitchAcceptRoutine(t *testing.T) {
}
type errorTransport struct {
service.BaseService
acceptErr error
}
func newErrTransport(acceptErr error) *errorTransport {
t := &errorTransport{acceptErr: acceptErr}
t.BaseService = *service.NewBaseService(nil, "Error Transport", t)
return t
}
func (et errorTransport) NetAddress() NetAddress {
panic("not implemented")
}
@@ -713,7 +721,9 @@ func (errorTransport) Cleanup(Peer) {
}
func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}})
sw := NewSwitch(cfg)
sw.SetTransport(newErrTransport(ErrFilterTimeout{}))
assert.NotPanics(t, func() {
err := sw.Start()
require.NoError(t, err)
@@ -721,7 +731,8 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
require.NoError(t, err)
})
sw = NewSwitch(cfg, errorTransport{ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}})
sw = NewSwitch(cfg)
sw.SetTransport(newErrTransport(ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}))
assert.NotPanics(t, func() {
err := sw.Start()
require.NoError(t, err)
@@ -730,7 +741,8 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
})
// TODO(melekes) check we remove our address from addrBook
sw = NewSwitch(cfg, errorTransport{ErrTransportClosed{}})
sw = NewSwitch(cfg)
sw.SetTransport(newErrTransport(ErrTransportClosed{}))
assert.NotPanics(t, func() {
err := sw.Start()
require.NoError(t, err)

View File

@@ -194,14 +194,11 @@ func MakeSwitch(
panic(err)
}
t := NewMultiplexTransport(nodeInfo, nodeKey, MConnConfig(cfg))
if err := t.Listen(*addr); err != nil {
panic(err)
}
t := NewMultiplexTransport(nodeInfo, nodeKey, *addr, MConnConfig(cfg))
// TODO: let the config be passed in?
sw := initSwitch(i, NewSwitch(cfg, t, opts...))
sw := initSwitch(i, NewSwitch(cfg, opts...))
sw.SetTransport(t)
sw.SetLogger(log.TestingLogger().With("switch", i))
sw.SetNodeKey(&nodeKey)

View File

@@ -11,6 +11,7 @@ import (
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p/conn"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
@@ -59,6 +60,8 @@ type peerConfig struct {
// the transport. Each transport is also responsible to filter establishing
// peers specific to its domain.
type Transport interface {
service.Service
// Listening address.
NetAddress() NetAddress
@@ -72,13 +75,6 @@ type Transport interface {
Cleanup(Peer)
}
// transportLifecycle bundles the methods for callers to control start and stop
// behavior.
type transportLifecycle interface {
Close() error
Listen(NetAddress) error
}
// ConnFilterFunc to be implemented by filter hooks after a new connection has
// been established. The set of exisiting connections is passed along together
// with all resolved IPs for the new connection.
@@ -136,6 +132,8 @@ func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
// MultiplexTransport accepts and dials tcp connections and upgrades them to
// multiplexed peers.
type MultiplexTransport struct {
service.BaseService
netAddr NetAddress
listener net.Listener
maxIncomingConnections int // see MaxIncomingConnections
@@ -162,15 +160,15 @@ type MultiplexTransport struct {
// Test multiplexTransport for interface completeness.
var _ Transport = (*MultiplexTransport)(nil)
var _ transportLifecycle = (*MultiplexTransport)(nil)
// NewMultiplexTransport returns a tcp connected multiplexed peer.
func NewMultiplexTransport(
nodeInfo NodeInfo,
nodeKey NodeKey,
netAddr NetAddress,
mConfig conn.MConnConfig,
) *MultiplexTransport {
return &MultiplexTransport{
t := &MultiplexTransport{
acceptc: make(chan accept),
closec: make(chan struct{}),
dialTimeout: defaultDialTimeout,
@@ -179,9 +177,12 @@ func NewMultiplexTransport(
mConfig: mConfig,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
netAddr: netAddr,
conns: NewConnSet(),
resolver: net.DefaultResolver,
}
t.BaseService = *service.NewBaseService(nil, "P2P Transport", t)
return t
}
// NetAddress implements Transport.
@@ -234,20 +235,20 @@ func (mt *MultiplexTransport) Dial(
return p, nil
}
// Close implements transportLifecycle.
func (mt *MultiplexTransport) Close() error {
// OnStop implements Service.
func (mt *MultiplexTransport) OnStop() {
close(mt.closec)
if mt.listener != nil {
return mt.listener.Close()
if err := mt.listener.Close(); err != nil {
mt.Logger.Error("closing listener", "err", err)
}
}
return nil
}
// Listen implements transportLifecycle.
func (mt *MultiplexTransport) Listen(addr NetAddress) error {
ln, err := net.Listen("tcp", addr.DialString())
// OnStart implements Service.
func (mt *MultiplexTransport) OnStart() error {
ln, err := net.Listen("tcp", mt.netAddr.DialString())
if err != nil {
return err
}
@@ -256,7 +257,6 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
}
mt.netAddr = addr
mt.listener = ln
go mt.acceptPeers()

View File

@@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/p2p/conn"
@@ -28,20 +29,26 @@ func emptyNodeInfo() NodeInfo {
func newMultiplexTransport(
nodeInfo NodeInfo,
nodeKey NodeKey,
netAddr NetAddress,
) *MultiplexTransport {
return NewMultiplexTransport(
nodeInfo, nodeKey, conn.DefaultMConnConfig(),
nodeInfo, nodeKey, netAddr, conn.DefaultMConnConfig(),
)
}
func TestTransportMultiplexConnFilter(t *testing.T) {
nodeKey := NodeKey{
PrivKey: ed25519.GenPrivKey(),
}
id := nodeKey.ID()
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
require.NoError(t, err)
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
nodeKey,
*addr,
)
id := mt.nodeKey.ID()
MultiplexTransportConnFilters(
func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil },
@@ -51,14 +58,8 @@ func TestTransportMultiplexConnFilter(t *testing.T) {
},
)(mt)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
err = mt.Start()
require.NoError(t, err)
errc := make(chan error)
@@ -89,13 +90,18 @@ func TestTransportMultiplexConnFilter(t *testing.T) {
}
func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
nodeKey := NodeKey{
PrivKey: ed25519.GenPrivKey(),
}
id := nodeKey.ID()
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
require.NoError(t, err)
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
nodeKey,
*addr,
)
id := mt.nodeKey.ID()
MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt)
MultiplexTransportConnFilters(
@@ -105,14 +111,9 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
},
)(mt)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
err = mt.Start()
require.NoError(t, err)
defer mt.Stop() //nolint:errcheck
errc := make(chan error)
go func() {
@@ -140,6 +141,10 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
pv := ed25519.GenPrivKey()
id := PubKeyToID(pv.PubKey())
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
mt := newMultiplexTransport(
testNodeInfo(
id, "transport",
@@ -147,26 +152,22 @@ func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
NodeKey{
PrivKey: pv,
},
*addr,
)
MultiplexTransportMaxIncomingConnections(0)(mt)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
const maxIncomingConns = 2
MultiplexTransportMaxIncomingConnections(maxIncomingConns)(mt)
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
err = mt.Start()
require.NoError(t, err)
laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
// Connect more peers than max
for i := 0; i <= maxIncomingConns; i++ {
errc := make(chan error)
go testDialer(*laddr, errc)
go testDialer(t, *laddr, errc)
err = <-errc
if i < maxIncomingConns {
@@ -198,7 +199,7 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) {
// Setup dialers.
for i := 0; i < nDialers; i++ {
go testDialer(*laddr, errc)
go testDialer(t, *laddr, errc)
}
// Catch connection errors.
@@ -235,23 +236,26 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) {
}
}
if err := mt.Close(); err != nil {
if err := mt.Stop(); err != nil {
t.Errorf("close errored: %v", err)
}
}
func testDialer(dialAddr NetAddress, errc chan error) {
var (
pv = ed25519.GenPrivKey()
dialer = newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName),
NodeKey{
PrivKey: pv,
},
)
func testDialer(t *testing.T, dialAddr NetAddress, errc chan error) {
pv := ed25519.GenPrivKey()
id := PubKeyToID(pv.PubKey())
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
require.NoError(t, err)
dialer := newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName),
NodeKey{
PrivKey: pv,
},
*addr,
)
_, err := dialer.Dial(dialAddr, peerConfig{})
_, err = dialer.Dial(dialAddr, peerConfig{})
if err != nil {
errc <- err
return
@@ -319,15 +323,14 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) {
go func() {
<-slowc
var (
dialer = newMultiplexTransport(
fastNodeInfo,
NodeKey{
PrivKey: fastNodePV,
},
)
)
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
dialer := newMultiplexTransport(
fastNodeInfo,
NodeKey{
PrivKey: fastNodePV,
},
*addr,
)
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -360,17 +363,16 @@ func TestTransportMultiplexValidateNodeInfo(t *testing.T) {
errc := make(chan error)
go func() {
var (
pv = ed25519.GenPrivKey()
dialer = newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
NodeKey{
PrivKey: pv,
},
)
)
pv := ed25519.GenPrivKey()
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
dialer := newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
NodeKey{
PrivKey: pv,
},
*addr,
)
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -401,6 +403,7 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
errc := make(chan error)
go func() {
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
dialer := newMultiplexTransport(
testNodeInfo(
PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer",
@@ -408,8 +411,8 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
*addr,
)
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -437,18 +440,16 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
func TestTransportMultiplexDialRejectWrongID(t *testing.T) {
mt := testSetupMultiplexTransport(t)
var (
pv = ed25519.GenPrivKey()
dialer = newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
NodeKey{
PrivKey: pv,
},
)
)
pv := ed25519.GenPrivKey()
wrongID := PubKeyToID(ed25519.GenPrivKey().PubKey())
addr := NewNetAddress(wrongID, mt.listener.Addr())
dialer := newMultiplexTransport(
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
NodeKey{
PrivKey: pv,
},
*addr,
)
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -471,14 +472,15 @@ func TestTransportMultiplexRejectIncompatible(t *testing.T) {
go func() {
var (
pv = ed25519.GenPrivKey()
addr = NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
dialer = newMultiplexTransport(
testNodeInfoWithNetwork(PubKeyToID(pv.PubKey()), "dialer", "incompatible-network"),
NodeKey{
PrivKey: pv,
},
*addr,
)
)
addr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
@@ -622,11 +624,16 @@ func TestTransportHandshake(t *testing.T) {
}
func TestTransportAddChannel(t *testing.T) {
pv := ed25519.GenPrivKey()
id := PubKeyToID(pv.PubKey())
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
require.NoError(t, err)
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
PrivKey: pv,
},
*addr,
)
testChannel := byte(0x01)
@@ -638,27 +645,27 @@ func TestTransportAddChannel(t *testing.T) {
// create listener
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
var (
pv = ed25519.GenPrivKey()
id = PubKeyToID(pv.PubKey())
mt = newMultiplexTransport(
testNodeInfo(
id, "transport",
),
NodeKey{
PrivKey: pv,
},
)
)
pv := ed25519.GenPrivKey()
id := PubKeyToID(pv.PubKey())
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
mt := newMultiplexTransport(
testNodeInfo(
id, "transport",
),
NodeKey{
PrivKey: pv,
},
*addr,
)
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
err = mt.Start()
require.NoError(t, err)
t.Cleanup(func() {
mt.Stop() //nolint:errcheck
})
// give the listener some time to get ready
time.Sleep(20 * time.Millisecond)

View File

@@ -39,26 +39,6 @@ func (l *localClientCreator) NewABCIClient() (abcicli.Client, error) {
return abcicli.NewLocalClient(l.mtx, l.app), nil
}
//---------------------------------------------------------------
// unsynchronized local proxy on an in-proc app (no mutex)
type unsyncLocalClientCreator struct {
app types.Application
}
// NewUnsyncLocalClientCreator returns a ClientCreator for the given app, which
// will be running locally. Unlike NewLocalClientCreator, this leaves
// synchronization up to the application.
func NewUnsyncLocalClientCreator(app types.Application) ClientCreator {
return &unsyncLocalClientCreator{
app: app,
}
}
func (l *unsyncLocalClientCreator) NewABCIClient() (abcicli.Client, error) {
return abcicli.NewUnsyncLocalClient(l.app), nil
}
//---------------------------------------------------------------
// remote proxy opens new connections to an external app process
@@ -103,12 +83,6 @@ func DefaultClientCreator(addr, transport, dbDir string) ClientCreator {
panic(err)
}
return NewLocalClientCreator(app)
case "e2e_sync":
app, err := e2e.NewSyncApplication(e2e.DefaultConfig(dbDir))
if err != nil {
panic(err)
}
return NewUnsyncLocalClientCreator(app)
case "noop":
return NewLocalClientCreator(types.NewBaseApplication())
default:

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"time"
"github.com/tendermint/tendermint/blocksync"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
@@ -16,6 +17,7 @@ import (
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/statesync"
"github.com/tendermint/tendermint/types"
)
@@ -91,6 +93,8 @@ type Environment struct {
TxIndexer txindex.TxIndexer
BlockIndexer indexer.BlockIndexer
ConsensusReactor *consensus.Reactor
BlocksyncReactor *blocksync.Reactor
StatesyncReactor *statesync.Reactor
EventBus *types.EventBus // thread safe
Mempool mempl.Mempool
@@ -199,9 +203,5 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
}
func latestUncommittedHeight() int64 {
nodeIsSyncing := env.ConsensusReactor.WaitSync()
if nodeIsSyncing {
return env.BlockStore.Height()
}
return env.BlockStore.Height() + 1
}

View File

@@ -17,23 +17,23 @@ var Routes = map[string]*rpc.RPCFunc{
"health": rpc.NewRPCFunc(Health, ""),
"status": rpc.NewRPCFunc(Status, ""),
"net_info": rpc.NewRPCFunc(NetInfo, ""),
"blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight", rpc.Cacheable()),
"genesis": rpc.NewRPCFunc(Genesis, "", rpc.Cacheable()),
"genesis_chunked": rpc.NewRPCFunc(GenesisChunked, "chunk", rpc.Cacheable()),
"block": rpc.NewRPCFunc(Block, "height", rpc.Cacheable("height")),
"block_by_hash": rpc.NewRPCFunc(BlockByHash, "hash", rpc.Cacheable()),
"block_results": rpc.NewRPCFunc(BlockResults, "height", rpc.Cacheable("height")),
"commit": rpc.NewRPCFunc(Commit, "height", rpc.Cacheable("height")),
"header": rpc.NewRPCFunc(Header, "height", rpc.Cacheable("height")),
"header_by_hash": rpc.NewRPCFunc(HeaderByHash, "hash", rpc.Cacheable()),
"check_tx": rpc.NewRPCFunc(CheckTx, "tx", rpc.Cacheable()),
"tx": rpc.NewRPCFunc(Tx, "hash,prove", rpc.Cacheable()),
"blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight"),
"genesis": rpc.NewRPCFunc(Genesis, ""),
"genesis_chunked": rpc.NewRPCFunc(GenesisChunked, "chunk"),
"block": rpc.NewRPCFunc(Block, "height"),
"block_by_hash": rpc.NewRPCFunc(BlockByHash, "hash"),
"block_results": rpc.NewRPCFunc(BlockResults, "height"),
"commit": rpc.NewRPCFunc(Commit, "height"),
"header": rpc.NewRPCFunc(Header, "height"),
"header_by_hash": rpc.NewRPCFunc(HeaderByHash, "hash"),
"check_tx": rpc.NewRPCFunc(CheckTx, "tx"),
"tx": rpc.NewRPCFunc(Tx, "hash,prove"),
"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"),
"block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"),
"validators": rpc.NewRPCFunc(Validators, "height,page,per_page", rpc.Cacheable("height")),
"validators": rpc.NewRPCFunc(Validators, "height,page,per_page"),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""),
"consensus_state": rpc.NewRPCFunc(ConsensusState, ""),
"consensus_params": rpc.NewRPCFunc(ConsensusParams, "height", rpc.Cacheable("height")),
"consensus_params": rpc.NewRPCFunc(ConsensusParams, "height"),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, "limit"),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""),
@@ -44,7 +44,7 @@ var Routes = map[string]*rpc.RPCFunc{
// abci API
"abci_query": rpc.NewRPCFunc(ABCIQuery, "path,data,height,prove"),
"abci_info": rpc.NewRPCFunc(ABCIInfo, "", rpc.Cacheable()),
"abci_info": rpc.NewRPCFunc(ABCIInfo, ""),
// evidence API
"broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"),

View File

@@ -51,6 +51,16 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
votingPower = val.VotingPower
}
phase := "initializing"
switch {
case env.StatesyncReactor.IsSyncing():
phase = "statesync"
case env.BlocksyncReactor.IsSyncing():
phase = "blocksync"
case env.ConsensusReactor.IsConsensusRunning():
phase = "consensus"
}
result := &ctypes.ResultStatus{
NodeInfo: env.P2PTransport.NodeInfo().(p2p.DefaultNodeInfo),
SyncInfo: ctypes.SyncInfo{
@@ -62,7 +72,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
EarliestAppHash: earliestAppHash,
EarliestBlockHeight: earliestBlockHeight,
EarliestBlockTime: time.Unix(0, earliestBlockTimeNano),
CatchingUp: env.ConsensusReactor.WaitSync(),
Phase: phase,
},
ValidatorInfo: ctypes.ValidatorInfo{
Address: env.PubKey.Address(),

View File

@@ -86,7 +86,9 @@ type SyncInfo struct {
EarliestBlockHeight int64 `json:"earliest_block_height"`
EarliestBlockTime time.Time `json:"earliest_block_time"`
CatchingUp bool `json:"catching_up"`
// Phase inidicates which processes are advancing state:
// Either statesync, blocksync or consensus
Phase string `json:"phase"`
}
// Info about the node's validator

View File

@@ -7,10 +7,8 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"os/exec"
"strings"
"testing"
"time"
@@ -39,7 +37,9 @@ const (
testVal = "acbd"
)
var ctx = context.Background()
var (
ctx = context.Background()
)
type ResultEcho struct {
Value string `json:"value"`
@@ -57,10 +57,6 @@ type ResultEchoDataBytes struct {
Value tmbytes.HexBytes `json:"value"`
}
type ResultEchoWithDefault struct {
Value int `json:"value"`
}
// Define some routes
var Routes = map[string]*server.RPCFunc{
"echo": server.NewRPCFunc(EchoResult, "arg"),
@@ -68,7 +64,6 @@ var Routes = map[string]*server.RPCFunc{
"echo_bytes": server.NewRPCFunc(EchoBytesResult, "arg"),
"echo_data_bytes": server.NewRPCFunc(EchoDataBytesResult, "arg"),
"echo_int": server.NewRPCFunc(EchoIntResult, "arg"),
"echo_default": server.NewRPCFunc(EchoWithDefault, "arg", server.Cacheable("arg")),
}
func EchoResult(ctx *types.Context, v string) (*ResultEcho, error) {
@@ -91,14 +86,6 @@ func EchoDataBytesResult(ctx *types.Context, v tmbytes.HexBytes) (*ResultEchoDat
return &ResultEchoDataBytes{v}, nil
}
func EchoWithDefault(ctx *types.Context, v *int) (*ResultEchoWithDefault, error) {
val := -1
if v != nil {
val = *v
}
return &ResultEchoWithDefault{val}, nil
}
func TestMain(m *testing.M) {
setup()
code := m.Run()
@@ -212,47 +199,26 @@ func echoDataBytesViaHTTP(cl client.Caller, bytes tmbytes.HexBytes) (tmbytes.Hex
return result.Value, nil
}
func echoWithDefaultViaHTTP(cl client.Caller, v *int) (int, error) {
params := map[string]interface{}{}
if v != nil {
params["arg"] = *v
}
result := new(ResultEchoWithDefault)
if _, err := cl.Call(ctx, "echo_default", params, result); err != nil {
return 0, err
}
return result.Value, nil
}
func testWithHTTPClient(t *testing.T, cl client.HTTPClient) {
val := testVal
got, err := echoViaHTTP(cl, val)
require.NoError(t, err)
require.Nil(t, err)
assert.Equal(t, got, val)
val2 := randBytes(t)
got2, err := echoBytesViaHTTP(cl, val2)
require.NoError(t, err)
require.Nil(t, err)
assert.Equal(t, got2, val2)
val3 := tmbytes.HexBytes(randBytes(t))
got3, err := echoDataBytesViaHTTP(cl, val3)
require.NoError(t, err)
require.Nil(t, err)
assert.Equal(t, got3, val3)
val4 := tmrand.Intn(10000)
got4, err := echoIntViaHTTP(cl, val4)
require.NoError(t, err)
require.Nil(t, err)
assert.Equal(t, got4, val4)
got5, err := echoWithDefaultViaHTTP(cl, nil)
require.NoError(t, err)
assert.Equal(t, got5, -1)
val6 := tmrand.Intn(10000)
got6, err := echoWithDefaultViaHTTP(cl, &val6)
require.NoError(t, err)
assert.Equal(t, got6, val6)
}
func echoViaWS(cl *client.WSClient, val string) (string, error) {
@@ -267,6 +233,7 @@ func echoViaWS(cl *client.WSClient, val string) (string, error) {
msg := <-cl.ResponsesCh
if msg.Error != nil {
return "", err
}
result := new(ResultEcho)
err = json.Unmarshal(msg.Result, result)
@@ -288,6 +255,7 @@ func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) {
msg := <-cl.ResponsesCh
if msg.Error != nil {
return []byte{}, msg.Error
}
result := new(ResultEchoBytes)
err = json.Unmarshal(msg.Result, result)
@@ -431,74 +399,6 @@ func TestWSClientPingPong(t *testing.T) {
time.Sleep(6 * time.Second)
}
func TestJSONRPCCaching(t *testing.T) {
httpAddr := strings.Replace(tcpAddr, "tcp://", "http://", 1)
cl, err := client.DefaultHTTPClient(httpAddr)
require.NoError(t, err)
// Not supplying the arg should result in not caching
params := make(map[string]interface{})
req, err := types.MapToRequest(types.JSONRPCIntID(1000), "echo_default", params)
require.NoError(t, err)
res1, err := rawJSONRPCRequest(t, cl, httpAddr, req)
defer func() { _ = res1.Body.Close() }()
require.NoError(t, err)
assert.Equal(t, "", res1.Header.Get("Cache-control"))
// Supplying the arg should result in caching
params["arg"] = tmrand.Intn(10000)
req, err = types.MapToRequest(types.JSONRPCIntID(1001), "echo_default", params)
require.NoError(t, err)
res2, err := rawJSONRPCRequest(t, cl, httpAddr, req)
defer func() { _ = res2.Body.Close() }()
require.NoError(t, err)
assert.Equal(t, "public, max-age=86400", res2.Header.Get("Cache-control"))
}
func rawJSONRPCRequest(t *testing.T, cl *http.Client, url string, req interface{}) (*http.Response, error) {
reqBytes, err := json.Marshal(req)
require.NoError(t, err)
reqBuf := bytes.NewBuffer(reqBytes)
httpReq, err := http.NewRequest(http.MethodPost, url, reqBuf)
require.NoError(t, err)
httpReq.Header.Set("Content-type", "application/json")
return cl.Do(httpReq)
}
func TestURICaching(t *testing.T) {
httpAddr := strings.Replace(tcpAddr, "tcp://", "http://", 1)
cl, err := client.DefaultHTTPClient(httpAddr)
require.NoError(t, err)
// Not supplying the arg should result in not caching
args := url.Values{}
res1, err := rawURIRequest(t, cl, httpAddr+"/echo_default", args)
defer func() { _ = res1.Body.Close() }()
require.NoError(t, err)
assert.Equal(t, "", res1.Header.Get("Cache-control"))
// Supplying the arg should result in caching
args.Set("arg", fmt.Sprintf("%d", tmrand.Intn(10000)))
res2, err := rawURIRequest(t, cl, httpAddr+"/echo_default", args)
defer func() { _ = res2.Body.Close() }()
require.NoError(t, err)
assert.Equal(t, "public, max-age=86400", res2.Header.Get("Cache-control"))
}
func rawURIRequest(t *testing.T, cl *http.Client, url string, args url.Values) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(args.Encode()))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
return cl.Do(req)
}
func randBytes(t *testing.T) []byte {
n := tmrand.Intn(10) + 2
buf := make([]byte, n)

View File

@@ -55,11 +55,6 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
requests = []types.RPCRequest{request}
}
// Set the default response cache to true unless
// 1. Any RPC request error.
// 2. Any RPC request doesn't allow to be cached.
// 3. Any RPC request has the height argument and the value is 0 (the default).
cache := true
for _, request := range requests {
request := request
@@ -77,13 +72,11 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
responses,
types.RPCInvalidRequestError(request.ID, fmt.Errorf("path %s is invalid", r.URL.Path)),
)
cache = false
continue
}
rpcFunc, ok := funcMap[request.Method]
if !ok || (rpcFunc.ws) {
if !ok || rpcFunc.ws {
responses = append(responses, types.RPCMethodNotFoundError(request.ID))
cache = false
continue
}
ctx := &types.Context{JSONReq: &request, HTTPReq: r}
@@ -95,16 +88,11 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
responses,
types.RPCInvalidParamsError(request.ID, fmt.Errorf("error converting json params to arguments: %w", err)),
)
cache = false
continue
}
args = append(args, fnArgs...)
}
if cache && !rpcFunc.cacheableWithArgs(args) {
cache = false
}
returns := rpcFunc.f.Call(args)
result, err := unreflectResult(returns)
if err != nil {
@@ -115,13 +103,7 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
}
if len(responses) > 0 {
var wErr error
if cache {
wErr = WriteCacheableRPCResponseHTTP(w, responses...)
} else {
wErr = WriteRPCResponseHTTP(w, responses...)
}
if wErr != nil {
if wErr := WriteRPCResponseHTTP(w, responses...); wErr != nil {
logger.Error("failed to write responses", "res", responses, "err", wErr)
}
}
@@ -146,6 +128,7 @@ func mapParamsToArgs(
params map[string]json.RawMessage,
argsOffset int,
) ([]reflect.Value, error) {
values := make([]reflect.Value, len(rpcFunc.argNames))
for i, argName := range rpcFunc.argNames {
argType := rpcFunc.args[i+argsOffset]
@@ -170,6 +153,7 @@ func arrayParamsToArgs(
params []json.RawMessage,
argsOffset int,
) ([]reflect.Value, error) {
if len(rpcFunc.argNames) != len(params) {
return nil, fmt.Errorf("expected %v parameters (%v), got %v (%v)",
len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)

View File

@@ -18,8 +18,7 @@ import (
func testMux() *http.ServeMux {
funcMap := map[string]*RPCFunc{
"c": NewRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"),
"block": NewRPCFunc(func(ctx *types.Context, h int) (string, error) { return "block", nil }, "height", Cacheable("height")),
"c": NewRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"),
}
mux := http.NewServeMux()
buf := new(bytes.Buffer)
@@ -228,52 +227,3 @@ func TestUnknownRPCPath(t *testing.T) {
require.Equal(t, http.StatusNotFound, res.StatusCode, "should always return 404")
res.Body.Close()
}
func TestRPCResponseCache(t *testing.T) {
mux := testMux()
body := strings.NewReader(`{"jsonrpc": "2.0","method":"block","id": 0, "params": ["1"]}`)
req, _ := http.NewRequest("Get", "http://localhost/", body)
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
// Always expecting back a JSONRPCResponse
require.True(t, statusOK(res.StatusCode), "should always return 2XX")
require.Equal(t, "public, max-age=86400", res.Header.Get("Cache-control"))
_, err := io.ReadAll(res.Body)
res.Body.Close()
require.Nil(t, err, "reading from the body should not give back an error")
// send a request with default height.
body = strings.NewReader(`{"jsonrpc": "2.0","method":"block","id": 0, "params": ["0"]}`)
req, _ = http.NewRequest("Get", "http://localhost/", body)
rec = httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res = rec.Result()
// Always expecting back a JSONRPCResponse
require.True(t, statusOK(res.StatusCode), "should always return 2XX")
require.Equal(t, "", res.Header.Get("Cache-control"))
_, err = io.ReadAll(res.Body)
res.Body.Close()
require.Nil(t, err, "reading from the body should not give back an error")
// send a request with default height, but as empty set of parameters.
body = strings.NewReader(`{"jsonrpc": "2.0","method":"block","id": 0, "params": []}`)
req, _ = http.NewRequest("Get", "http://localhost/", body)
rec = httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res = rec.Result()
// Always expecting back a JSONRPCResponse
require.True(t, statusOK(res.StatusCode), "should always return 2XX")
require.Equal(t, "", res.Header.Get("Cache-control"))
_, err = io.ReadAll(res.Body)
res.Body.Close()
require.Nil(t, err, "reading from the body should not give back an error")
}

View File

@@ -117,22 +117,6 @@ func WriteRPCResponseHTTPError(
// WriteRPCResponseHTTP marshals res as JSON (with indent) and writes it to w.
func WriteRPCResponseHTTP(w http.ResponseWriter, res ...types.RPCResponse) error {
return writeRPCResponseHTTP(w, []httpHeader{}, res...)
}
// WriteCacheableRPCResponseHTTP marshals res as JSON (with indent) and writes
// it to w. Adds cache-control to the response header and sets the expiry to
// one day.
func WriteCacheableRPCResponseHTTP(w http.ResponseWriter, res ...types.RPCResponse) error {
return writeRPCResponseHTTP(w, []httpHeader{{"Cache-Control", "public, max-age=86400"}}, res...)
}
type httpHeader struct {
name string
value string
}
func writeRPCResponseHTTP(w http.ResponseWriter, headers []httpHeader, res ...types.RPCResponse) error {
var v interface{}
if len(res) == 1 {
v = res[0]
@@ -145,9 +129,6 @@ func writeRPCResponseHTTP(w http.ResponseWriter, headers []httpHeader, res ...ty
return fmt.Errorf("json marshal: %w", err)
}
w.Header().Set("Content-Type", "application/json")
for _, header := range headers {
w.Header().Set(header.name, header.value)
}
w.WriteHeader(200)
_, err = w.Write(jsonBytes)
return err
@@ -185,6 +166,7 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler
// Without this, Chrome & Firefox were retrying aborted ajax requests,
// at least to my localhost.
if e := recover(); e != nil {
// If RPCResponse
if res, ok := e.(types.RPCResponse); ok {
if wErr := WriteRPCResponseHTTP(rww, res); wErr != nil {

View File

@@ -112,7 +112,7 @@ func TestWriteRPCResponseHTTP(t *testing.T) {
// one argument
w := httptest.NewRecorder()
err := WriteCacheableRPCResponseHTTP(w, types.NewRPCSuccessResponse(id, &sampleResult{"hello"}))
err := WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(id, &sampleResult{"hello"}))
require.NoError(t, err)
resp := w.Result()
body, err := io.ReadAll(resp.Body)
@@ -120,7 +120,6 @@ func TestWriteRPCResponseHTTP(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode)
assert.Equal(t, "application/json", resp.Header.Get("Content-Type"))
assert.Equal(t, "public, max-age=86400", resp.Header.Get("Cache-control"))
assert.Equal(t, `{
"jsonrpc": "2.0",
"id": -1,

View File

@@ -63,14 +63,7 @@ func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWrit
}
return
}
resp := types.NewRPCSuccessResponse(dummyID, result)
if rpcFunc.cacheableWithArgs(args) {
err = WriteCacheableRPCResponseHTTP(w, resp)
} else {
err = WriteRPCResponseHTTP(w, resp)
}
if err != nil {
if err := WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(dummyID, result)); err != nil {
logger.Error("failed to write response", "res", result, "err", err)
return
}

View File

@@ -23,96 +23,40 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger lo
mux.HandleFunc("/", handleInvalidJSONRPCPaths(makeJSONRPCHandler(funcMap, logger)))
}
type Option func(*RPCFunc)
// Cacheable enables returning a cache control header from RPC functions to
// which it is applied.
//
// `noCacheDefArgs` is a list of argument names that, if omitted or set to
// their defaults when calling the RPC function, will skip the response
// caching.
func Cacheable(noCacheDefArgs ...string) Option {
return func(r *RPCFunc) {
r.cacheable = true
r.noCacheDefArgs = make(map[string]interface{})
for _, arg := range noCacheDefArgs {
r.noCacheDefArgs[arg] = nil
}
}
}
// Ws enables WebSocket communication.
func Ws() Option {
return func(r *RPCFunc) {
r.ws = true
}
}
// Function introspection
// RPCFunc contains the introspected type information for a function
type RPCFunc struct {
f reflect.Value // underlying rpc function
args []reflect.Type // type of each function arg
returns []reflect.Type // type of each return arg
argNames []string // name of each argument
cacheable bool // enable cache control
ws bool // enable websocket communication
noCacheDefArgs map[string]interface{} // a lookup table of args that, if not supplied or are set to default values, cause us to not cache
f reflect.Value // underlying rpc function
args []reflect.Type // type of each function arg
returns []reflect.Type // type of each return arg
argNames []string // name of each argument
ws bool // websocket only
}
// NewRPCFunc wraps a function for introspection.
// f is the function, args are comma separated argument names
func NewRPCFunc(f interface{}, args string, options ...Option) *RPCFunc {
return newRPCFunc(f, args, options...)
func NewRPCFunc(f interface{}, args string) *RPCFunc {
return newRPCFunc(f, args, false)
}
// NewWSRPCFunc wraps a function for introspection and use in the websockets.
func NewWSRPCFunc(f interface{}, args string, options ...Option) *RPCFunc {
options = append(options, Ws())
return newRPCFunc(f, args, options...)
func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
return newRPCFunc(f, args, true)
}
// cacheableWithArgs returns whether or not a call to this function is cacheable,
// given the specified arguments.
func (f *RPCFunc) cacheableWithArgs(args []reflect.Value) bool {
if !f.cacheable {
return false
}
// Skip the context variable common to all RPC functions
for i := 1; i < len(f.args); i++ {
// f.argNames does not include the context variable
argName := f.argNames[i-1]
if _, hasDefault := f.noCacheDefArgs[argName]; hasDefault {
// Argument with default value was not supplied
if i >= len(args) {
return false
}
// Argument with default value is set to its zero value
if args[i].IsZero() {
return false
}
}
}
return true
}
func newRPCFunc(f interface{}, args string, options ...Option) *RPCFunc {
func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
var argNames []string
if args != "" {
argNames = strings.Split(args, ",")
}
r := &RPCFunc{
return &RPCFunc{
f: reflect.ValueOf(f),
args: funcArgTypes(f),
returns: funcReturnTypes(f),
argNames: argNames,
ws: ws,
}
for _, opt := range options {
opt(r)
}
return r
}
// return a function's argument types

View File

@@ -216,9 +216,6 @@ paths:
Please refer to
https://docs.tendermint.com/main/tendermint-core/using-tendermint.html#formatting
for formatting/encoding rules.
Upon success, the `Cache-Control` header will be set with the default
maximum age.
parameters:
- in: query
name: tx
@@ -624,12 +621,9 @@ paths:
tags:
- Info
description: |
Get block headers for minHeight <= height <= maxHeight.
Get block headers for minHeight <= height maxHeight.
At most 20 items will be returned.
Upon success, the `Cache-Control` header will be set with the default
maximum age.
responses:
"200":
description: Block headers, returned in descending order (highest first).
@@ -659,9 +653,6 @@ paths:
- Info
description: |
Get Header.
If the `height` field is set to a non-default value, upon success, the
`Cache-Control` header will be set with the default maximum age.
responses:
"200":
description: Header informations.
@@ -691,9 +682,6 @@ paths:
- Info
description: |
Get Header By Hash.
Upon success, the `Cache-Control` header will be set with the default
maximum age.
responses:
"200":
description: Header informations.
@@ -723,9 +711,6 @@ paths:
- Info
description: |
Get Block.
If the `height` field is set to a non-default value, upon success, the
`Cache-Control` header will be set with the default maximum age.
responses:
"200":
description: Block informations.
@@ -755,9 +740,6 @@ paths:
- Info
description: |
Get Block By Hash.
Upon success, the `Cache-Control` header will be set with the default
maximum age.
responses:
"200":
description: Block informations.
@@ -778,7 +760,7 @@ paths:
parameters:
- in: query
name: height
description: height to return. If no height is provided, it will fetch information regarding the latest block.
description: height to return. If no height is provided, it will fetch informations regarding the latest block.
schema:
type: integer
default: 0
@@ -787,9 +769,6 @@ paths:
- Info
description: |
Get block_results.
If the `height` field is set to a non-default value, upon success, the
`Cache-Control` header will be set with the default maximum age.
responses:
"200":
description: Block results.
@@ -819,9 +798,6 @@ paths:
- Info
description: |
Get Commit.
If the `height` field is set to a non-default value, upon success, the
`Cache-Control` header will be set with the default maximum age.
responses:
"200":
description: |
@@ -869,11 +845,7 @@ paths:
tags:
- Info
description: |
Get Validators. Validators are sorted first by voting power
(descending), then by address (ascending).
If the `height` field is set to a non-default value, upon success, the
`Cache-Control` header will be set with the default maximum age.
Get Validators. Validators are sorted first by voting power (descending), then by address (ascending).
responses:
"200":
description: Commit results.
@@ -895,9 +867,6 @@ paths:
- Info
description: |
Get genesis.
Upon success, the `Cache-Control` header will be set with the default
maximum age.
responses:
"200":
description: Genesis results.
@@ -976,9 +945,6 @@ paths:
- Info
description: |
Get consensus parameters.
If the `height` field is set to a non-default value, upon success, the
`Cache-Control` header will be set with the default maximum age.
responses:
"200":
description: consensus parameters results.
@@ -1169,14 +1135,14 @@ paths:
parameters:
- in: query
name: hash
description: hash of transaction to retrieve
description: transaction Hash to retrive
required: true
schema:
type: string
example: "0xD70952032620CC4E2737EB8AC379806359D8E0B17B0488F627997A0B043ABDED"
- in: query
name: prove
description: Include proofs of the transaction's inclusion in the block
description: Include proofs of the transactions inclusion in the block
required: false
schema:
type: boolean
@@ -1185,10 +1151,7 @@ paths:
tags:
- Info
description: |
Get a transaction
Upon success, the `Cache-Control` header will be set with the default
maximum age.
Get a trasasction
responses:
"200":
description: Get a transaction`
@@ -1204,15 +1167,12 @@ paths:
$ref: "#/components/schemas/ErrorResponse"
/abci_info:
get:
summary: Get info about the application.
summary: Get some info about the application.
operationId: abci_info
tags:
- ABCI
description: |
Get info about the application.
Upon success, the `Cache-Control` header will be set with the default
maximum age.
Get some info about the application.
responses:
"200":
description: Get some info about the application.

View File

@@ -44,7 +44,7 @@ title: Methods
| version | string | The application software semantic version | 2 |
| app_version | uint64 | The application protocol version | 3 |
| last_block_height | int64 | Latest height for which the app persisted its state | 4 |
| last_block_app_hash | bytes | Latest AppHash returned by `Commit` | 5 |
| last_block_app_hash | bytes | Latest AppHash returned by `FinalizeBlock` | 5 |
* **Usage**:
* Return information about the application state.
@@ -52,7 +52,7 @@ title: Methods
that happens on startup or on recovery.
* The returned `app_version` will be included in the Header of every block.
* Tendermint expects `last_block_app_hash` and `last_block_height` to
be updated and persisted during `Commit`.
be updated during `FinalizeBlock` and persisted during `Commit`.
> Note: Semantic version is a reference to [semantic versioning](https://semver.org/). Semantic versions in info will be displayed as X.X.x.

View File

@@ -4,10 +4,3 @@ parent:
title: P2P
order: 6
---
# Peer-to-Peer Communication
The operation of the p2p adopted in production Tendermint networks is [HERE](./v0.34/).
> This is part of an ongoing [effort](https://github.com/tendermint/tendermint/issues/9089)
> to produce a high-level specification of the operation of the p2p layer.

View File

@@ -1,70 +0,0 @@
# Peer-to-Peer Communication
This document describes the implementation of the peer-to-peer (p2p)
communication layer in Tendermint.
It is part of an [effort](https://github.com/tendermint/tendermint/issues/9089)
to produce a high-level specification of the operation of the p2p layer adopted
in production Tendermint networks.
This documentation, therefore, considers the releases `0.34.*` of Tendermint, more
specifically, the branch [`v0.34.x`](https://github.com/tendermint/tendermint/tree/v0.34.x)
of this repository.
## Overview
A Tendermint network is composed of multiple Tendermint instances, hereafter
called **nodes**, that interact by exchanging messages.
Tendermint assumes a partially-connected network model.
This means that a node is not assumed to be directly connected to every other
node in the network.
Instead, each node is directly connected to a subset of other nodes in the
network, hereafter called its **peers**.
The peer-to-peer (p2p) communication layer is responsible for establishing
connections between nodes in a Tendermint network,
for managing the communication between a node and its peers,
and for intermediating the exchange of messages between peers in Tendermint protocols.
## Contents
The documentation follows the organization of the `p2p` package of Tendermint,
which implements the following abstractions:
- [Transport](./transport.md): establishes secure and authenticated
connections with peers;
- [Switch](./switch.md): responsible for dialing peers and accepting
connections from peers, for managing established connections, and for
routing messages between the reactors and peers,
that is, between local and remote instances of the Tendermint protocols;
- [PEX Reactor](./pex.md): a reactor is the implementation of a protocol which
exchanges messages through the p2p layer. The PEX reactor manages the [Address Book](./addressbook.md) and implements both the [PEX protocol](./pex-protocol.md) and the [Peer Manager](./peer_manager.md) role.
- [Peer Exchange protocol](./pex-protocol.md): enables nodes to exchange peer addresses, thus implementing a peer discovery service;
- [Address Book](./addressbook.md): stores discovered peer addresses and
quality metrics associated to peers with which the node has interacted;
- [Peer Manager](./peer_manager.md): defines when and to which peers a node
should dial, in order to establish outbound connections;
- Finally, [Types](./types.md) and [Configuration](./configuration.md) provide
a list of existing types and configuration parameters used by the p2p layer implementation.
## Further References
Existing documentation referring to the p2p layer:
- https://github.com/tendermint/tendermint/tree/main/spec/p2p: p2p-related
configuration flags; overview of connections, peer instances, and reactors;
overview of peer discovery and node types; peer identity, secure connections
and peer authentication handshake.
- https://github.com/tendermint/tendermint/tree/main/spec/p2p/messages: message
types and channel IDs of Block Sync, Mempool, Evidence, State Sync, PEX, and
Consensus reactors.
- https://docs.tendermint.com/v0.34/tendermint-core: the p2p layer
configuration and operation is documented in several pages.
This content is not necessarily up-to-date, some settings and concepts may
refer to the release `v0.35`, that was [discontinued][v35postmorten].
- https://github.com/tendermint/tendermint/tree/master/docs/tendermint-core/pex:
peer types, peer discovery, peer management overview, address book and peer
ranking. This documentation refers to the release `v0.35`, that was [discontinued][v35postmorten].
[v35postmorten]: https://interchain-io.medium.com/discontinuing-tendermint-v0-35-a-postmortem-on-the-new-networking-layer-3696c811dabc

View File

@@ -1,367 +0,0 @@
# Address Book
The address book tracks information about peers, i.e., about other nodes in the network.
The primary information stored in the address book are peer addresses.
A peer address is composed by a node ID and a network address; a network
address is composed by an IP address or a DNS name plus a port number.
The same node ID can be associated to multiple network addresses.
There are two sources for the addresses stored in the address book.
The [Peer Exchange protocol](./pex-protocol.md) stores in the address book
the peer addresses it discovers, i.e., it learns from connected peers.
And the [Switch](./switch.md) registers the addresses of peers with which it
has interacted: to which it has dialed or from which it has accepted a
connection.
The address book also records additional information about peers with which the
node has interacted, from which is possible to rank peers.
The Switch reports [connection attempts](#dial-attempts) to a peer address; too
much failed attempts indicate that a peer address is invalid.
Reactors, in they turn, report a peer as [good](#good-peers) when it behaves as
expected, or as a [bad peer](#bad-peers), when it misbehaves.
There are two entities that retrieve peer addresses from the address book.
The [Peer Manager](./peer_manager.md) retrieves peer addresses to dial, so to
establish outbound connections.
This selection is random, but has a configurable bias towards peers that have
been marked as good peers.
The [Peer Exchange protocol](./pex-protocol.md) retrieves random samples of
addresses to offer (send) to peers.
This selection is also random but it includes, in particular for nodes that
operate in seed mode, some bias toward peers marked as good ones.
## Buckets
Peer addresses are stored in buckets.
There are buckets for new addresses and buckets for old addresses.
The buckets for new addresses store addresses of peers about which the node
does not have much information; the first address registered for a peer ID is
always stored in a bucket for new addresses.
The buckets for old addresses store addresses of peers with which the node has
interacted and that were reported as [good peers](#good-peers) by a reactor.
An old address therefore can be seen as an alias for a good address.
> Note that new addresses does not mean bad addresses.
> The addresses of peers marked as [bad peers](#bad-peers) are removed from the
> buckets where they are stored, and temporarily kept in a table of banned peers.
The number of buckets is fixed and there are more buckets for new addresses
(`256`) than buckets for old addresses (`64`), a ratio of 4:1.
Each bucket can store up to `64` addresses.
When a bucket becomes full, the peer address with the lowest ranking is removed
from the bucket.
The first choice is to remove bad addresses, with multiple failed attempts
associated.
In the absence of those, the *oldest* address in the bucket is removed, i.e.,
the address with the oldest last attempt to dial.
When a bucket for old addresses becomes full, the lowest-ranked peer address in
the bucket is moved to a bucket of new addresses.
When a bucket for new addresses becomes full, the lowest-ranked peer address in
the bucket is removed from the address book.
In other words, exceeding old or good addresses are downgraded to new
addresses, while exceeding new addresses are dropped.
The bucket that stores an `address` is defined by the following two methods,
for new and old addresses:
- `calcNewBucket(address, source) = hash(key + groupKey(source) + hash(key + groupKey(address) + groupKey(source)) % newBucketsPerGroup) % newBucketCount`
- `calcOldBucket(address) = hash(key + groupKey(address) + hash(key + address) % oldBucketsPerGroup) % oldBucketCount`
The `key` is a fixed random 96-bit (8-byte) string.
The `groupKey` for an address is a string representing its network group.
The `source` of an address is the address of the peer from which we learn the
address..
The first (internal) hash is reduced to an integer up to `newBucketsPerGroup =
32`, for new addresses, and `oldBucketsPerGroup = 4`, for old addresses.
The second (external) hash is reduced to bucket indexes, in the interval from 0
to the number of new (`newBucketCount = 256`) or old (`oldBucketCount = 64`) buckets.
Notice that new addresses with sources from the same network group are more
likely to end up in the same bucket, therefore to competing for it.
For old address, instead, two addresses are more likely to end up in the same
bucket when they belong to the same network group.
## Adding addresses
The `AddAddress` method adds the address of a peer to the address book.
The added address is associated to a *source* address, which identifies the
node from which the peer address was learned.
Addresses are added to the address book in the following situations:
1. When a peer address is learned via PEX protocol, having the sender
of the PEX message as its source
2. When an inbound peer is added, in this case the peer itself is set as the
source of its own address
3. When the switch is instructed to dial addresses via the `DialPeersAsync`
method, in this case the node itself is set as the source
If the added address contains a node ID that is not registered in the address
book, the address is added to a [bucket](#buckets) of new addresses.
Otherwise, the additional address for an existing node ID is **not added** to
the address book when:
- The last address added with the same node ID is stored in an old bucket, so
it is considered a "good" address
- There are addresses associated to the same node ID stored in
`maxNewBucketsPerAddress = 4` distinct buckets
- Randomly, with a probability that increases exponentially with the number of
buckets in which there is an address with the same node ID.
So, a new address for a node ID which is already present in one bucket is
added with 50% of probability; if the node ID is present in two buckets, the
probability decreases to 25%; and if it is present in three buckets, the
probability is 12.5%.
The new address is also added to the `addrLookup` table, which stores
`knownAddress` entries indexed by their node IDs.
If the new address is from an unknown peer, a new entry is added to the
`addrLookup` table; otherwise, the existing entry is updated with the new
address.
Entries of this table contain, among other fields, the list of buckets where
addresses of a peer are stored.
The `addrLookup` table is used by most of the address book methods (e.g.,
`HasAddress`, `IsGood`, `MarkGood`, `MarkAttempt`), as it provides fast access
to addresses.
### Errors
- if the added address or the associated source address are nil
- if the added address is invalid
- if the added address is the local node's address
- if the added address ID is of a [banned](#bad-peers) peer
- if either the added address or the associated source address IDs are configured as private IDs
- if `routabilityStrict` is set and the address is not routable
- in case of failures computing the bucket for the new address (`calcNewBucket` method)
- if the added address instance, which is a new address, is configured as an
old address (sanity check of `addToNewBucket` method)
## Need for Addresses
The `NeedMoreAddrs` method verifies whether the address book needs more addresses.
It is invoked by the PEX reactor to define whether to request peer addresses
to a new outbound peer or to a randomly selected connected peer.
The address book needs more addresses when it has less than `1000` addresses
registered, counting all buckets for new and old addresses.
## Pick address
The `PickAddress` method returns an address stored in the address book, chosen
at random with a configurable bias towards new addresses.
It is invoked by the Peer Manager to obtain a peer address to dial, as part of
its `ensurePeers` routine.
The bias starts from 10%, when the peer has no outbound peers, increasing by
10% for each outbound peer the node has, up to 90%, when the node has at least
8 outbound peers.
The configured bias is a parameter that influences the probability of choosing
an address from a bucket of new addresses or from a bucket of old addresses.
A second parameter influencing this choice is the number of new and old
addresses stored in the address book.
In the absence of bias (i.e., if the configured bias is 50%), the probability
of picking a new address is given by the square root of the number of new
addresses divided by the sum of the square roots of the numbers of new and old
addresses.
By adding a bias toward new addresses (i.e., configured bias larger than 50%),
the portion on the sample occupied by the square root of the number of new
addresses increases, while the corresponding portion for old addresses decreases.
As a result, it becomes more likely to pick a new address at random from this sample.
> The use of the square roots softens the impact of disproportional numbers of
> new and old addresses in the address book. This is actually the expected
> scenario, as there are 4 times more buckets for new addresses than buckets
> for old addresses.
Once the type of address, new or old, is defined, a non-empty bucket of this
type is selected at random.
From the selected bucket, an address is chosen at random and returned.
If all buckets of the selected type are empty, no address is returned.
## Random selection
The `GetSelection` method returns a selection of addresses stored in the
address book, with no bias toward new or old addresses.
It is invoked by the PEX protocol to obtain a list of peer addresses with two
purposes:
- To send to a peer in a PEX response, in the case of outbound peers or of
nodes not operating in seed mode
- To crawl, in the case of nodes operating in seed mode, as part of every
interaction of the `crawlPeersRoutine`
The selection is a random subset of the peer addresses stored in the
`addrLookup` table, which stores the last address added for each peer ID.
The target size of the selection is `23%` (`getSelectionPercent`) of the
number of addresses stored in the address book, but it should not be lower than
`32` (`minGetSelection`) --- if it is, all addresses in the book are returned
--- nor greater than `250` (`maxGetSelection`).
> The random selection is produced by:
> - Retrieving all entries of the `addrLookup` map, which by definition are
> returned in random order.
> - Randomly shuffling the retrieved list, using the Fisher-Yates algorithm
## Random selection with bias
The `GetSelectionWithBias` method returns a selection of addresses stored in
the address book, with bias toward new addresses.
It is invoked by the PEX protocol to obtain a list of peer addresses to be sent
to a peer in a PEX response.
This method is only invoked by seed nodes, when replying to a PEX request
received from an inbound peer (i.e., a peer that dialed the seed node).
The bias used in this scenario is hard-coded to 30%, meaning that 70% of
the returned addresses are expected to be old addresses.
The number of addresses that compose the selection is computed in the same way
as for the non-biased random selection.
The bias toward new addresses is implemented by requiring that the configured
bias, interpreted as a percentage, of the select addresses come from buckets of
new addresses, while the remaining come from buckets of old addresses.
Since the number of old addresses is typically lower than the number of new
addresses, it is possible that the address book does not have enough old
addresses to include in the selection.
In this case, additional new addresses are included in the selection.
Thus, the configured bias, in practice, is towards old addresses, not towards
new addresses.
To randomly select addresses of a type, the address book considers all
addresses present in every bucket of that type.
This list of all addresses of a type is randomly shuffled, and the requested
number of addresses are retrieved from the tail of this list.
The returned selection contains, at its beginning, a random selection of new
addresses in random order, followed by a random selection of old addresses, in
random order.
## Dial Attempts
The `MarkAttempt` method records a failed attempt to connect to an address.
It is invoked by the Peer Manager when it fails dialing a peer, but the failure
is not in the authentication step (`ErrSwitchAuthenticationFailure` error).
In case of authentication errors, the peer is instead marked as a [bad peer](#bad-peers).
The failed connection attempt is recorded in the address registered for the
peer's ID in the `addrLookup` table, which is the last address added with that ID.
The known address' counter of failed `Attempts` is increased and the failure
time is registered in `LastAttempt`.
The possible effect of recording multiple failed connect attempts to a peer is
to turn its address into a *bad* address (do not confuse with banned addresses).
A known address becomes bad if it is stored in buckets of new addresses, and
when connection attempts:
- Have not been made over a week, i.e., `LastAttempt` is older than a week
- Have failed 3 times and never succeeded, i.e., `LastSucess` field is unset
- Have failed 10 times in the last week, i.e., `LastSucess` is older than a week
Addresses marked as *bad* are the first candidates to be removed from a bucket of
new addresses when the bucket becomes full.
> Note that failed connection attempts are reported for a peer address, but in
> fact the address book records them for a peer.
>
> More precisely, failed connection attempts are recorded in the entry of the
> `addrLookup` table with reported peer ID, which contains the last address
> added for that node ID, which is not necessarily the reported peer address.
## Good peers
The `MarkGood` method marks a peer ID as good.
It is invoked by the consensus reactor, via switch, when the number of useful
messages received from a peer is a multiple of `10000`.
Vote and block part messages are considered for this number, they must be valid
and not be duplicated messages to be considered useful.
> The `SwitchReporter` type of `behaviour` package also invokes the `MarkGood`
> method when a "reason" associated with consensus votes and block parts is
> reported.
> No reactor, however, currently provides these "reasons" to the `SwitchReporter`.
The effect of this action is that the address registered for the peer's ID in the
`addrLookup` table, which is the last address added with that ID, is marked as
good and moved to a bucket of old addresses.
An address marked as good has its failed to connect counter and timestamp reset.
If the destination bucket of old addresses is full, the oldest address in the
bucket is moved (downgraded) to a bucket of new addresses.
Moving the peer address to a bucket of old addresses has the effect of
upgrading, or increasing the ranking of a peer in the address book.
## Bad peers
The `MarkBad` method marks a peer as bad and bans it for a period of time.
This method is only invoked within the PEX reactor, with a banning time of 24
hours, for the following reasons:
- A peer misbehaves in the [PEX protocol](pex-protocol.md#misbehavior)
- When the `maxAttemptsToDial` limit (`16`) is reached for a peer
- If an `ErrSwitchAuthenticationFailure` error is returned when dialing a peer
The effect of this action is that the address registered for the peer's ID in the
`addrLookup` table, which is the last address added with that ID, is banned for
a period of time.
The banned peer is removed from the `addrLookup` table and from all buckets
where its addresses are stored.
The information about banned peers, however, is not discarded.
It is maintained in the `badPeers` map, indexed by peer ID.
This allows, in particular, addresses of banned peers to be
[reinstated](#reinstating-addresses), i.e., to be added
back to the address book, when their ban period expires.
## Reinstating addresses
The `ReinstateBadPeers` method attempts to re-add banned addresses to the address book.
It is invoked by the PEX reactor when dialing new peers.
This action is taken before requesting additional addresses to peers,
in the case that the node needs more peer addresses.
The set of banned peer addresses is retrieved from the `badPeers` map.
Addresses that are not any longer banned, i.e., whose banned period has expired,
are added back to the address book as new addresses, while the corresponding
node IDs are removed from the `badPeers` map.
## Removing addresses
The `RemoveAddress` method removes an address from the address book.
It is invoked by the switch when it dials a peer or accepts a connection from a
peer that ends up being the node itself (`IsSelf` error).
In both cases, the address dialed or accepted is also added to the address book
as a local address, via the `AddOurAddress` method.
The same logic is also internally used by the address book for removing
addresses of a peer that is [marked as a bad peer](#bad-peers).
The entry registered with the peer ID of the address in the `addrLookup` table,
which is the last address added with that ID, is removed from all buckets where
it is stored and from the `addrLookup` table.
> FIXME: is it possible that addresses with the same ID as the removed address,
> but with distinct network addresses, are kept in buckets of the address book?
> While they will not be accessible anymore, as there is no reference to them
> in the `addrLookup`, they will still be there.
## Persistence
The `loadFromFile` method, called when the address book is started, reads
address book entries from a file, passed to the address book constructor.
The file, at this point, does not need to exist.
The `saveRoutine` is started when the address book is started.
It saves the address book to the configured file every `dumpAddressInterval`,
hard-coded to 2 minutes.
It is also possible to save the content of the address book using the `Save`
method.
Saving the address book content to a file acquires the address book lock, also
employed by all other public methods.

View File

@@ -1,51 +0,0 @@
# Tendermint p2p configuration
This document contains configurable parameters a node operator can use to tune the p2p behaviour.
| Parameter| Default| Description |
| --- | --- | ---|
| ListenAddress | "tcp://0.0.0.0:26656" | Address to listen for incoming connections (0.0.0.0:0 means any interface, any port) |
| ExternalAddress | "" | Address to advertise to peers for them to dial |
| [Seeds](pex-protocol.md#seed-nodes) | empty | Comma separated list of seed nodes to connect to (ID@host:port )|
| [Persistent peers](peer_manager.md#persistent-peers) | empty | Comma separated list of nodes to keep persistent connections to (ID@host:port ) |
| UPNP | false | UPNP port forwarding enabled |
| [AddrBook](addressbook.md) | defaultAddrBookPath | Path do address book |
| AddrBookStrict | true | Set true for strict address routability rules and false for private or local networks |
| [MaxNumInboundPeers](switch.md#accepting-peers) | 40 | Maximum number of inbound peers |
| [MaxNumOutboundPeers](peer_manager.md#ensure-peers) | 10 | Maximum number of outbound peers to connect to, excluding persistent peers |
| [UnconditionalPeers](switch.md#accepting-peers) | empty | These are IDs of the peers which are allowed to be (re)connected as both inbound or outbound regardless of whether the node reached `max_num_inbound_peers` or `max_num_outbound_peers` or not. |
| PersistentPeersMaxDialPeriod| 0 * time.Second | Maximum pause when redialing a persistent peer (if zero, exponential backoff is used) |
| FlushThrottleTimeout |100 * time.Millisecond| Time to wait before flushing messages out on the connection |
| MaxPacketMsgPayloadSize | 1024 | Maximum size of a message packet payload, in bytes |
| SendRate | 5120000 (5 mB/s) | Rate at which packets can be sent, in bytes/second |
| RecvRate | 5120000 (5 mB/s) | Rate at which packets can be received, in bytes/second|
| [PexReactor](pex.md) | true | Set true to enable the peer-exchange reactor |
| SeedMode | false | Seed mode, in which node constantly crawls the network and looks for. Does not work if the peer-exchange reactor is disabled. |
| PrivatePeerIDs | empty | Comma separated list of peer IDsthat we do not add to the address book or gossip to other peers. They stay private to us. |
| AllowDuplicateIP | false | Toggle to disable guard against peers connecting from the same ip.|
| [HandshakeTimeout](transport.md#connection-upgrade) | 20 * time.Second | Timeout for handshake completion between peers |
| [DialTimeout](switch.md#dialing-peers) | 3 * time.Second | Timeout for dialing a peer |
These parameters can be set using the `$TMHOME/config/config.toml` file. A subset of them can also be changed via command line using the following command line flags:
| Parameter | Flag| Example|
| --- | --- | ---|
| Listen address| `p2p.laddr` | "tcp://0.0.0.0:26656" |
| Seed nodes | `p2p.seeds` | `--p2p.seeds “id100000000000000000000000000000000@1.2.3.4:26656,id200000000000000000000000000000000@2.3.4.5:4444”` |
| Persistent peers | `p2p.persistent_peers` | `--p2p.persistent_peers “id100000000000000000000000000000000@1.2.3.4:26656,id200000000000000000000000000000000@2.3.4.5:26656”` |
| Unconditional peers | `p2p.unconditional_peer_ids` | `--p2p.unconditional_peer_ids “id100000000000000000000000000000000,id200000000000000000000000000000000”` |
| UPNP | `p2p.upnp` | `--p2p.upnp` |
| PexReactor | `p2p.pex` | `--p2p.pex` |
| Seed mode | `p2p.seed_mode` | `--p2p.seed_mode` |
| Private peer ids | `p2p.private_peer_ids` | `--p2p.private_peer_ids “id100000000000000000000000000000000,id200000000000000000000000000000000”` |
**Note on persistent peers**
If `persistent_peers_max_dial_period` is set greater than zero, the
pause between each dial to each persistent peer will not exceed `persistent_peers_max_dial_period`
during exponential backoff and we keep trying again without giving up.
If `seeds` and `persistent_peers` intersect,
the user will be warned that seeds may auto-close connections
and that the node may not be able to keep the connection persistent.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 129 KiB

View File

@@ -1,147 +0,0 @@
# Peer Manager
The peer manager is responsible for establishing connections with peers.
It defines when a node should dial peers and which peers it should dial.
The peer manager is not an implementation abstraction of the p2p layer,
but a role that is played by the [PEX reactor](./pex.md).
## Outbound peers
The `ensurePeersRoutine` is a persistent routine intended to ensure that a node
is connected to `MaxNumOutboundPeers` outbound peers.
This routine is continuously executed by regular nodes, i.e. nodes not
operating in seed mode, as part of the PEX reactor implementation.
The logic defining when the node should dial peers, for selecting peers to dial
and for actually dialing them is implemented in the `ensurePeers` method.
This method is periodically invoked -- every `ensurePeersPeriod`, with default
value to 30 seconds -- by the `ensurePeersRoutine`.
A node is expected to dial peers whenever the number of outbound peers is lower
than the configured `MaxNumOutboundPeers` parameter.
The current number of outbound peers is retrieved from the switch, using the
`NumPeers` method, which also reports the number of nodes to which the switch
is currently dialing.
If the number of outbound peers plus the number of dialing routines equals to
`MaxNumOutboundPeers`, nothing is done.
Otherwise, the `ensurePeers` method will attempt to dial node addresses in
order to reach the target number of outbound peers.
Once defined that the node needs additional outbound peers, the node queries
the address book for candidate addresses.
This is done using the [`PickAddress`](./addressbook.md#pick-address) method,
which returns an address selected at random on the address book, with some bias
towards new or old addresses.
When the node has up to 3 outbound peers, the adopted bias is towards old
addresses, i.e., addresses of peers that are believed to be "good".
When the node has from 5 outbound peers, the adopted bias is towards new
addresses, i.e., addresses of peers about which the node has not yet collected
much information.
So, the more outbound peers a node has, the less conservative it will be when
selecting new peers.
The selected peer addresses are then dialed in parallel, by starting a dialing
routine per peer address.
Dialing a peer address can fail for multiple reasons.
The node might have attempted to dial the peer too many times.
In this case, the peer address is marked as bad and removed from the address book.
The node might have attempted and failed to dial the peer recently
and the exponential `backoffDuration` has not yet passed.
Or the current connection attempt might fail, which is registered in the address book.
None of these errors are explicitly handled by the `ensurePeers` method, which
also does not wait until the connections are established.
The third step of the `ensurePeers` method is to ensure that the address book
has enough addresses.
This is done, first, by [reinstating banned peers](./addressbook.md#Reinstating-addresses)
whose ban period has expired.
Then, the node randomly selects a connected peer, which can be either an
inbound or outbound peer, to [requests addresses](./pex-protocol.md#Requesting-Addresses)
using the PEX protocol.
Last, and this action is only performed if the node could not retrieve any new
address to dial from the address book, the node dials the configured seed nodes
in order to establish a connection to at least one of them.
### Fast dialing
As above described, seed nodes are actually the last source of peer addresses
for regular nodes.
They are contacted by a node when, after an invocation of the `ensurePeers`
method, no suitable peer address to dial is retrieved from the address book
(e.g., because it is empty).
Once a connection with a seed node is established, the node immediately
[sends a PEX request](./pex-protocol.md#Requesting-Addresses) to it, as it is
added as an outbound peer.
When the corresponding PEX response is received, the addresses provided by the
seed node are added to the address book.
As a result, in the next invocation of the `ensurePeers` method, the node
should be able to dial some of the peer addresses provided by the seed node.
However, as observed in this [issue](https://github.com/tendermint/tendermint/issues/2093),
it can take some time, up to `ensurePeersPeriod` or 30 seconds, from when the
node receives new peer addresses and when it dials the received addresses.
To avoid this delay, which can be particularly relevant when the node has no
peers, a node immediately attempts to dial peer addresses when they are
received from a peer that is locally configured as a seed node.
> FIXME: The current logic was introduced in [#3762](https://github.com/tendermint/tendermint/pull/3762).
> Although it fix the issue, the delay between receiving an address and dialing
> the peer, it does not impose and limit on how many addresses are dialed in this
> scenario.
> So, all addresses received from a seed node are dialed, regardless of the
> current number of outbound peers, the number of dialing routines, or the
> `MaxNumOutboundPeers` parameter.
>
> Issue [#9548](https://github.com/tendermint/tendermint/issues/9548) was
> created to handle this situation.
### First round
When the PEX reactor is started, the `ensurePeersRoutine` is created and it
runs thorough the operation of a node, periodically invoking the `ensurePeers`
method.
However, if when the persistent routine is started the node already has some
peers, either inbound or outbound peers, or is dialing some addresses, the
first invocation of `ensurePeers` is delayed by a random amount of time from 0
to `ensurePeersPeriod`.
### Persistent peers
The node configuration can contain a list of *persistent peers*.
Those peers have preferential treatment compared to regular peers and the node
is always trying to connect to them.
Moreover, these peers are not removed from the address book in the case of
multiple failed dial attempts.
On startup, the node immediately tries to dial the configured persistent peers
by calling the switch's [`DialPeersAsync`](./switch.md#manual-operation) method.
This is not done in the p2p package, but it is part of the procedure to set up a node.
> TODO: the handling of persistent peers should be described in more detail.
### Life cycle
The picture below is a first attempt of illustrating the life cycle of an outbound peer:
<img src="img/p2p_state.png" width="50%" title="Outgoing peers lifecycle">
A peer can be in the following states:
- Candidate peers: peer addresses stored in the address boook, that can be
retrieved via the [`PickAddress`](./addressbook.md#pick-address) method
- [Dialing](switch.md#dialing-peers): peer addresses that are currently being
dialed. This state exists to ensure that a single dialing routine exist per peer.
- [Reconnecting](switch.md#reconnect-to-peer): persistent peers to which a node
is currently reconnecting, as a previous connection attempt has failed.
- Connected peers: peers that a node has successfully dialed, added as outbound peers.
- [Bad peers](addressbook.md#bad-peers): peers marked as bad in the address
book due to exhibited [misbehavior](pex-protocol.md#misbehavior).
Peers can be reinstated after being marked as bad.
## Pending of documentation
The `dialSeeds` method of the PEX reactor.
The `dialPeer` method of the PEX reactor.
This includes `dialAttemptsInfo`, `maxBackoffDurationForPeer` methods.

View File

@@ -1,240 +0,0 @@
# Peer Exchange Protocol
The Peer Exchange (PEX) protocol enables nodes to exchange peer addresses, thus
implementing a peer discovery mechanism.
The PEX protocol uses two messages:
- `PexRequest`: sent by a node to [request](#requesting-addresses) peer
addresses to a peer
- `PexAddrs`: a list of peer addresses [provided](#providing-addresses) to a
peer as response to a `PexRequest` message
While all nodes, with few exceptions, participate on the PEX protocol,
a subset of nodes, configured as [seed nodes](#seed-nodes) have a particular
role in the protocol.
They crawl the network, connecting to random peers, in order to learn as many
peer addresses as possible to provide to other nodes.
## Requesting Addresses
A node requests peer addresses by sending a `PexRequest` message to a peer.
For regular nodes, not operating in seed mode, a PEX request is sent when
the node *needs* peers addresses, a condition checked:
1. When an *outbound* peer is added, causing the node to request addresses from
the new peer
2. Periodically, by the `ensurePeersRoutine`, causing the node to request peer
addresses to a randomly selected peer
A node needs more peer addresses when its addresses book has
[less than 1000 records](./addressbook.md#need-for-addresses).
It is thus reasonable to assume that the common case is that a peer needs more
peer addresses, so that PEX requests are sent whenever the above two situations happen.
A PEX request is sent when a new *outbound* peer is added.
The same does not happen with new inbound peers because the implementation
considers outbound peers, that the node has chosen for dialing, more
trustworthy than inbound peers, that the node has accepted.
Moreover, when a node is short of peer addresses, it dials the configured seed nodes;
since they are added as outbound peers, the node can immediately request peer addresses.
The `ensurePeersRoutine` periodically checks, by default every 30 seconds (`ensurePeersPeriod`),
whether the node has enough outbound peers.
If it does not have, the node tries dialing some peer addresses stored in the address book.
As part of this procedure, the node selects a peer at random,
from the set of connected peers retrieved from the switch,
and sends a PEX request to the selected peer.
Sending a PEX request to a peer is implemented by the `RequestAddrs` method of
the PEX reactor.
### Responses
After a PEX request is sent to a peer, the node expects to receive,
as a response, a `PexAddrs` message from the peer.
This message encodes a list of peer addresses that are
[added to address book](./addressbook.md#adding-addresses),
having the peer from which the PEX response was received as their source.
Received PEX responses are handled by the `ReceiveAddrs` method of the PEX reactor.
In the case of a PEX response received from a peer which is configured as
a seed node, the PEX reactor attempts immediately to dial the provided peer
addresses, as detailed [here](./peer_manager.md#fast-dialing).
### Misbehavior
Sending multiple PEX requests to a peer, before receiving a reply from it,
is considered a misbehavior.
To prevent it, the node maintains a `requestsSent` set of outstanding
requests, indexed by destination peers.
While a peer ID is present in the `requestsSent` set, the node does not send
further PEX requests to that peer.
A peer ID is removed from the `requestsSent` set when a PEX response is
received from it.
Sending a PEX response to a peer that has not requested peer addresses
is also considered a misbehavior.
So, if a PEX response is received from a peer that is not registered in
the `requestsSent` set, a `ErrUnsolicitedList` error is produced.
This leads the peer to be disconnected and [marked as a bad peer](addressbook.md#bad-peers).
## Providing Addresses
When a node receives a `PexRequest` message from a peer,
it replies with a `PexAddrs` message.
This message encodes a [random selection of peer addresses](./addressbook.md#random-selection)
retrieved from the address book.
Sending a PEX response to a peer is implemented by the `SendAddrs` method of
the PEX reactor.
### Misbehavior
Requesting peer addresses too often is considered a misbehavior.
Since node are expected to send PEX requests every `ensurePeersPeriod`,
the minimum accepted interval between requests from the same peer is set
to `ensurePeersPeriod / 3`, 10 seconds by default.
The `receiveRequest` method is responsible for verifying this condition.
The node keeps a `lastReceivedRequests` map with the time of the last PEX
request received from every peer.
If the interval between successive requests is less than the minimum accepted
one, the peer is disconnected and [marked as a bad peer](addressbook.md#bad-peers).
An exception is made for the first two PEX requests received from a peer.
> The probably reason is that, when a new peer is added, the two conditions for
> a node to request peer addresses can be triggered with an interval lower than
> the minimum accepted interval.
> Since this is a legit behavior, it should not be punished.
## Seed nodes
A seed node is a node configured to operate in `SeedMode`.
### Crawling peers
Seed nodes crawl the network, connecting to random peers and sending PEX
requests to them, in order to learn as many peer addresses as possible.
More specifically, a node operating in seed mode sends PEX requests in two cases:
1. When an outbound peer is added, and the seed node needs more peer addresses,
it requests peer addresses to the new peer
2. Periodically, the `crawlPeersRoutine` sends PEX requests to a random set of
peers, whose addresses are registered in the Address Book
The first case also applies for nodes not operating in seed mode.
The second case replaces the second for regular nodes, as seed nodes do not
run the `ensurePeersRoutine`, as regular nodes,
but run the `crawlPeersRoutine`, which is not run by regular nodes.
The `crawlPeersRoutine` periodically, every 30 seconds (`crawlPeerPeriod`),
starts a new peer discovery round.
First, the seed node retrieves a random selection of peer addresses from its
Address Book.
This selection is produced in the same way as in the random selection of peer
addresses that are [provided](#providing-addresses) to a requesting peer.
Peers that the seed node has crawled recently,
less than 2 minutes ago (`minTimeBetweenCrawls`), are removed from this selection.
The remaining peer addresses are registered in the `crawlPeerInfos` table.
The seed node is not necessarily connected to the peer whose address is
selected for each round of crawling.
So, the seed node dials the selected peer addresses.
This is performed in foreground, one peer at a time.
As a result, a round of crawling can take a substantial amount of time.
For each selected peer it succeeds dialing to, this include already connected
peers, the seed node sends a PEX request.
Dialing a selected peer address can fail for multiple reasons.
The seed node might have attempted to dial the peer too many times.
In this case, the peer address is marked as [bad in the address book](addressbook.md#bad-peers).
The seed node might have attempted to dial the peer recently, without success,
and the exponential `backoffDuration` has not yet passed.
Or the current connection attempt might fail, which is registered in the address book.
Failures to dial to a peer address produce an information that is important for
a seed node.
They indicate that a peer is unreachable, or is not operating correctly, and
therefore its address should not be provided to other nodes.
This occurs when, due to multiple failed connection attempts or authentication
failures, the peer address ends up being removed from the address book.
As a result, the periodically crawling of selected peers not only enables the
discovery of new peers, but also allows the seed node to stop providing
addresses of bad peers.
### Offering addresses
Nodes operating in seed mode handle PEX requests differently than regular
nodes, whose operation is described [here](#providing-addresses).
This distinction exists because nodes dial a seed node with the main, if not
exclusive goal of retrieving peer addresses.
In other words, nodes do not dial a seed node because they intend to have it as
a peer in the multiple Tendermint protocols, but because they believe that a
seed node is a good source of addresses of nodes to which they can establish
connections and interact in the multiple Tendermint protocols.
So, when a seed node receives a `PexRequest` message from an inbound peer,
it sends a `PexAddrs` message, containing a selection of peer
addresses, back to the peer and *disconnects* from it.
Seed nodes therefore treat inbound connections from peers as a short-term
connections, exclusively intended to retrieve peer addresses.
Once the requested peer addresses are sent, the connection with the peer is closed.
Moreover, the selection of peer addresses provided to inbound peers by a seed
node, although still essentially random, has a [bias toward old
addresses](./addressbook.md#random-selection-with-bias).
The selection bias is defined by `biasToSelectNewPeers`, hard-coded to `30%`,
meaning that `70%` of the peer addresses provided by a seed node are expected
to be old addresses.
Although this nomenclature is not clear, *old* addresses are the addresses that
survived the most in the address book, that is, are addresses that the seed
node believes being from *good* peers (more details [here](./addressbook.md#good-peers)).
Another distinction is on the handling of potential [misbehavior](#misbehavior-1)
of peers requesting addresses.
A seed node does not enforce, a priori, a minimal interval between PEX requests
from inbound peers.
Instead, it does not reply to more than one PEX request per peer inbound
connection, and, as above mentioned, it disconnects from incoming peers after
responding to them.
If the same peer dials again to the seed node and requests peer addresses, the
seed node will reply to this peer like it was the first time it has requested
peer addresses.
> This is more an implementation restriction than a desired behavior.
> The `lastReceivedRequests` map stores the last time a PEX request was
> received from a peer, and the entry relative to a peer is removed from this
> map when the peer is disconnected.
>
> It is debatable whether this approach indeed prevents abuse against seed nodes.
### Disconnecting from peers
Seed nodes treat connections with peers as short-term connections, which are
mainly, if not exclusively, intended to exchange peer addresses.
In the case of inbound peers, that have dialed the seed node, the intent of the
connection is achieved once a PEX response is sent to the peer.
The seed node thus disconnects from an inbound peer after sending a `PexAddrs`
message to it.
In the case of outbound peers, which the seed node has dialed for crawling peer
addresses, the intent of the connection is essentially achieved when a PEX
response is received from the peer.
The seed node, however, does not disconnect from a peer after receiving a
selection of peer addresses from it.
As a result, after some rounds of crawling, a seed node will have established
connections to a substantial amount of peers.
To couple with the existence of multiple connections with peers that have no
longer purpose for the seed node, the `crawlPeersRoutine` also invokes, after
each round of crawling, the `attemptDisconnects` method.
This method retrieves the list of connected peers from the switch, and
disconnects from peers that are not persistent peers, and with which a
connection is established for more than `SeedDisconnectWaitPeriod`.
This period is a configuration parameter, set to 28 hours when the PEX reactor
is created by the default node constructor.

View File

@@ -1,111 +0,0 @@
# PEX Reactor
The PEX reactor is one of the reactors running in a Tendermint node.
Its implementation is located in the `p2p/pex` package, and it is considered
part of the implementation of the p2p layer.
This document overviews the implementation of the PEX reactor, describing how
the methods from the `Reactor` interface are implemented.
The actual operation of the PEX reactor is presented in documents describing
the roles played by the PEX reactor in the p2p layer:
- [Address Book](./addressbook.md): stores known peer addresses and information
about peers to which the node is connected or has attempted to connect
- [Peer Manager](./peer_manager.md): manages connections established with peers,
defining when a node should dial peers and which peers it should dial
- [Peer Exchange protocol](./pex-protocol.md): enables nodes to exchange peer
addresses, thus implementing a peer discovery service
## OnStart
The `OnStart` method implements `BaseService` and starts the PEX reactor.
The [address book](./addressbook.md), which is a `Service` is started.
This loads the address book content from disk,
and starts a routine that periodically persists the address book content to disk.
The PEX reactor is configured with the addresses of a number of seed nodes,
the `Seeds` parameter of the `ReactorConfig`.
The addresses of seed nodes are parsed into `NetAddress` instances and resolved
into IP addresses, which is implemented by the `checkSeeds` method.
Valid seed node addresses are stored in the `seedAddrs` field,
and are used by the `dialSeeds` method to contact the configured seed nodes.
The last action is to start one of the following persistent routines, based on
the `SeedMode` configuration parameter:
- Regular nodes run the `ensurePeersRoutine` to check whether the node has
enough outbound peers, dialing peers when necessary
- Seed nodes run the `crawlPeersRoutine` to periodically start a new round
of [crawling](./pex-protocol.md#Crawling-peers) to discover as many peer
addresses as possible
### Errors
Errors encountered when loading the address book from disk are returned,
and prevent the reactor from being started.
An exception is made for the `service.ErrAlreadyStarted` error, which is ignored.
Errors encountered when parsing the configured addresses of seed nodes
are returned and cause the reactor startup to fail.
An exception is made for DNS resolution `ErrNetAddressLookup` errors,
which are not deemed fatal and are only logged as invalid addresses.
If none of the configured seed node addresses is valid, and the loaded address
book is empty, the reactor is not started and an error is returned.
## OnStop
The `OnStop` method implements `BaseService` and stops the PEX reactor.
The address book routine that periodically saves its content to disk is stopped.
## GetChannels
The `GetChannels` method, from the `Reactor` interface, returns the descriptor
of the channel used by the PEX protocol.
The channel ID is `PexChannel` (0), with priority `1`, send queue capacity of
`10`, and maximum message size of `64000` bytes.
## AddPeer
The `AddPeer` method, from the `Reactor` interface,
adds a new peer to the PEX protocol.
If the new peer is an **inbound peer**, i.e., if the peer has dialed the node,
the peer's address is [added to the address book](./addressbook.md#adding-addresses).
Since the peer was authenticated when establishing a secret connection with it,
the source of the peer address is trusted, and its source is set by the peer itself.
In the case of an outbound peer, the node should already have its address in
the address book, as the switch has dialed the peer.
If the peer is an **outbound peer**, i.e., if the node has dialed the peer,
and the PEX protocol needs more addresses,
the node [sends a PEX request](./pex-protocol.md#Requesting-Addresses) to the peer.
The same is not done when inbound peers are added because they are deemed least
trustworthy than outbound peers.
## RemovePeer
The `RemovePeer` method, from the `Reactor` interface,
removes a peer from the PEX protocol.
The peer's ID is removed from the tables tracking PEX requests
[sent](./pex-protocol.md#misbehavior) but not yet replied
and PEX requests [received](./pex-protocol.md#misbehavior-1).
## Receive
The `Receive` method, from the `Reactor` interface,
handles a message received by the PEX protocol.
A node receives two type of messages as part of the PEX protocol:
- `PexRequest`: a request for addresses received from a peer, handled as
described [here](./pex-protocol.md#providing-addresses)
- `PexAddrs`: a list of addresses received from a peer, as a reponse to a PEX
request sent by the node, as described [here](./pex-protocol.md#responses)

View File

@@ -1,237 +0,0 @@
# Switch
The switch is a core component of the p2p layer.
It manages the procedures for [dialing peers](#dialing-peers) and
[accepting](#accepting-peers) connections from peers, which are actually
implemented by the [transport](./transport.md).
It also manages the reactors, i.e., protocols implemented by the node that
interact with its peers.
Once a connection with a peer is established, the peer is [added](#add-peer) to
the switch and all registered reactors.
Reactors may also instruct the switch to [stop a peer](#stop-peer), namely
disconnect from it.
The switch, in this case, makes sure that the peer is removed from all
registered reactors.
## Dialing peers
Dialing a peer is implemented by the `DialPeerWithAddress` method.
This method is invoked by the [peer manager](./peer_manager.md#ensure-peers)
to dial a peer address and establish a connection with an outbound peer.
The switch keeps a single dialing routine per peer ID.
This is ensured by keeping a synchronized map `dialing` with the IDs of peers
to which the peer is dialing.
A peer ID is added to `dialing` when the `DialPeerWithAddress` method is called
for that peer, and it is removed when the method returns for whatever reason.
The method returns immediately when invoked for a peer which ID is already in
the `dialing` structure.
The actual dialing is implemented by the [`Dial`](./transport.md#dial) method
of the transport configured for the switch, in the `addOutboundPeerWithConfig`
method.
If the transport succeeds establishing a connection, the returned `Peer` is
added to the switch using the [`addPeer`](#add-peer) method.
This operation can fail, returning an error. In this case, the switch invokes
the transport's [`Cleanup`](./transport.md#cleanup) method to clean any resources
associated with the peer.
If the transport fails to establish a connection with the peer that is configured
as a persistent peer, the switch spawns a routine to [reconnect to the peer](#reconnect-to-peer).
If the peer is already in the `reconnecting` state, the spawned routine has no
effect and returns immediately.
This is in fact a likely scenario, as the `reconnectToPeer` routine relies on
this same `DialPeerWithAddress` method for dialing peers.
### Manual operation
The `DialPeersAsync` method receives a list of peer addresses (strings)
and dials all of them in parallel.
It is invoked in two situations:
- In the [setup](https://github.com/tendermint/tendermint/blob/29c5a062d23aaef653f11195db55c45cd9e02715/node/node.go#L985) of a node, to establish connections with every configured
persistent peer
- In the RPC package, to implement two unsafe RPC commands, not used in production:
[`DialSeeds`](https://github.com/tendermint/tendermint/blob/29c5a062d23aaef653f11195db55c45cd9e02715/rpc/core/net.go#L47) and
[`DialPeers`](https://github.com/tendermint/tendermint/blob/29c5a062d23aaef653f11195db55c45cd9e02715/rpc/core/net.go#L87)
The received list of peer addresses to dial is parsed into `NetAddress` instances.
In case of parsing errors, the method returns. An exception is made for
DNS resolution `ErrNetAddressLookup` errors, which do not interrupt the procedure.
As the peer addresses provided to this method are typically not known by the node,
contrarily to the addressed dialed using the `DialPeerWithAddress` method,
they are added to the node's address book, which is persisted to disk.
The switch dials the provided peers in parallel.
The list of peer addresses is randomly shuffled, and for each peer a routine is
spawned.
Each routine sleeps for a random interval, up to 3 seconds, then invokes the
`DialPeerWithAddress` method that actually dials the peer.
### Reconnect to peer
The `reconnectToPeer` method is invoked when a connection attempt to a peer fails,
and the peer is configured as a persistent peer.
The `reconnecting` synchronized map keeps the peer's in this state, identified
by their IDs (string).
This should ensure that a single instance of this method is running at any time.
The peer is kept in this map while this method is running for it: it is set on
the beginning, and removed when the method returns for whatever reason.
If the peer is already in the `reconnecting` state, nothing is done.
The remaining of the method performs multiple connection attempts to the peer,
via `DialPeerWithAddress` method.
If a connection attempt succeeds, the methods returns and the routine finishes.
The same applies when an `ErrCurrentlyDialingOrExistingAddress` error is
returned by the dialing method, as it indicates that peer is already connected
or that another routine is attempting to (re)connect to it.
A first set of connection attempts is done at (about) regular intervals.
More precisely, between two attempts, the switch waits for a interval of
`reconnectInterval`, hard-coded to 5 seconds, plus a random jitter up to
`dialRandomizerIntervalMilliseconds`, hard-coded to 3 seconds.
At most `reconnectAttempts`, hard-coded to 20, are made using this
regular-interval approach.
A second set of connection attempts is done with exponentially increasing
intervals.
The base interval `reconnectBackOffBaseSeconds` is hard-coded to 3 seconds,
which is also the increasing factor.
The exponentially increasing dialing interval is adjusted as well by a random
jitter up to `dialRandomizerIntervalMilliseconds`.
At most `reconnectBackOffAttempts`, hard-coded to 10, are made using this approach.
> Note: the first sleep interval, to which a random jitter is applied, is 1,
> not `reconnectBackOffBaseSeconds`, as the first exponent is `0`...
## Accepting peers
The `acceptRoutine` method is a persistent routine that handles connections
accepted by the transport configured for the switch.
The [`Accept`](./transport.md#accept) method of the configured transport
returns a `Peer` with which an inbound connection was established.
The switch accepts a new peer if the maximum number of inbound peers was not
reached, or if the peer was configured as an _unconditional peer_.
The maximum number of inbound peers is determined by the `MaxNumInboundPeers`
configuration parameter, whose default value is `40`.
If accepted, the peer is added to the switch using the [`addPeer`](#add-peer) method.
If the switch does not accept the established incoming connection, or if the
`addPeer` method returns an error, the switch invokes the transport's
[`Cleanup`](./transport.md#cleanup) method to clean any resources associated
with the peer.
The transport's `Accept` method can also return a number of errors.
Errors of `ErrRejected` or `ErrFilterTimeout` types are ignored,
an `ErrTransportClosed` causes the accepted routine to be interrupted,
while other errors cause the routine to panic.
> TODO: which errors can cause the routine to panic?
## Add peer
The `addPeer` method adds a peer to the switch,
either after dialing (by `addOutboundPeerWithConfig`, called by `DialPeerWithAddress`)
a peer and establishing an outbound connection,
or after accepting (`acceptRoutine`) a peer and establishing an inbound connection.
The first step is to invoke the `filterPeer` method.
It checks whether the peer is already in the set of connected peers,
and whether any of the configured `peerFilter` methods reject the peer.
If the peer is already present or it is rejected by any filter, the `addPeer`
method fails and returns an error.
Then, the new peer is started, added to the set of connected peers, and added
to all reactors.
More precisely, first the new peer's information is first provided to every
reactor (`InitPeer` method).
Next, the peer's sending and receiving routines are started, and the peer is
added to set of connected peers.
These two operations can fail, causing `addPeer` to return an error.
Then, in the absence of previous errors, the peer is added to every reactor (`AddPeer` method).
> Adding the peer to the peer set returns a `ErrSwitchDuplicatePeerID` error
> when a peer with the same ID is already presented.
>
> TODO: Starting a peer could be reduced as starting the MConn with that peer?
## Stop peer
There are two methods for stopping a peer, namely disconnecting from it, and
removing it from the table of connected peers.
The `StopPeerForError` method is invoked to stop a peer due to an external
error, which is provided to method as a generic "reason".
The `StopPeerGracefully` method stops a peer in the absence of errors or, more
precisely, not providing to the switch any "reason" for that.
In both cases the `Peer` instance is stopped, the peer is removed from all
registered reactors, and finally from the list of connected peers.
> Issue https://github.com/tendermint/tendermint/issues/3338 is mentioned in
> the internal `stopAndRemovePeer` method explaining why removing the peer from
> the list of connected peers is the last action taken.
When there is a "reason" for stopping the peer (`StopPeerForError` method)
and the peer is a persistent peer, the method creates a routine to attempt
reconnecting to the peer address, using the `reconnectToPeer` method.
If the peer is an outbound peer, the peer's address is know, since the switch
has dialed the peer.
Otherwise, the peer address is retrieved from the `NodeInfo` instance from the
connection handshake.
## Add reactor
The `AddReactor` method registers a `Reactor` to the switch.
The reactor is associated to the set of channel ids it employs.
Two reactors (in the same node) cannot share the same channel id.
There is a call back to the reactor, in which the switch passes itself to the
reactor.
## Remove reactor
The `RemoveReactor` method unregisters a `Reactor` from the switch.
The reactor is disassociated from the set of channel ids it employs.
There is a call back to the reactor, in which the switch passes `nil` to the
reactor.
## OnStart
This is a `BaseService` method.
All registered reactors are started.
The switch's `acceptRoutine` is started.
## OnStop
This is a `BaseService` method.
All (connected) peers are stopped and removed from the peer's list using the
`stopAndRemovePeer` method.
All registered reactors are stopped.
## Broadcast
This method broadcasts a message on a channel, by sending the message in
parallel to all connected peers.
The method spawns a thread for each connected peer, invoking the `Send` method
provided by each `Peer` instance with the provided message and channel ID.
The return value (a boolean) of these calls are redirected to a channel that is
returned by the method.
> TODO: detail where this method is invoked:
> - By the consensus protocol, in `broadcastNewRoundStepMessage`,
> `broadcastNewValidBlockMessage`, and `broadcastHasVoteMessage`
> - By the state sync protocol

View File

@@ -1,222 +0,0 @@
# Transport
The transport establishes secure and authenticated connections with peers.
The transport [`Dial`](#dial)s peer addresses to establish outbound connections,
and [`Listen`](#listen)s in a configured network address
to [`Accept`](#accept) inbound connections from peers.
The transport establishes raw TCP connections with peers
and [upgrade](#connection-upgrade) them into authenticated secret connections.
The established secret connection is then wrapped into `Peer` instance, which
is returned to the caller, typically the [switch](./switch.md).
## Dial
The `Dial` method is used by the switch to establish an outbound connection with a peer.
It is a synchronous method, which blocks until a connection is established or an error occurs.
The method returns an outbound `Peer` instance wrapping the established connection.
The transport first dials the provided peer's address to establish a raw TCP connection.
The dialing maximum duration is determined by `dialTimeout`, hard-coded to 1 second.
The established raw connection is then submitted to a set of [filters](#connection-filtering),
which can reject it.
If the connection is not rejected, it is recorded in the table of established connections.
The established raw TCP connection is then [upgraded](#connection-upgrade) into
an authenticated secret connection.
This procedure should ensure, in particular, that the public key of the remote peer
matches the ID of the dialed peer, which is part of peer address provided to this method.
In the absence of errors,
the established secret connection (`conn.SecretConnection` type)
and the information about the peer (`NodeInfo` record) retrieved and verified
during the version handshake,
are wrapped into an outbound `Peer` instance and returned to the switch.
## Listen
The `Listen` method produces a TCP listener instance for the provided network
address, and spawns an `acceptPeers` routine to handle the raw connections
accepted by the listener.
The `NetAddress` method exports the listen address configured for the transport.
The maximum number of simultaneous incoming connections accepted by the listener
is bound to `MaxNumInboundPeer` plus the configured number of unconditional peers,
using the `MultiplexTransportMaxIncomingConnections` option,
in the node [initialization](https://github.com/tendermint/tendermint/blob/29c5a062d23aaef653f11195db55c45cd9e02715/node/node.go#L563).
This method is called when a node is [started](https://github.com/tendermint/tendermint/blob/29c5a062d23aaef653f11195db55c45cd9e02715/node/node.go#L972).
In case of errors, the `acceptPeers` routine is not started and the error is returned.
## Accept
The `Accept` method returns to the switch inbound connections established with a peer.
It is a synchronous method, which blocks until a connection is accepted or an error occurs.
The method returns an inbound `Peer` instance wrapping the established connection.
The transport handles incoming connections in the `acceptPeers` persistent routine.
This routine is started by the [`Listen`](#listen) method
and accepts raw connections from a TCP listener.
A new routine is spawned for each accepted connection.
The raw connection is submitted to a set of [filters](#connection-filtering),
which can reject it.
If the connection is not rejected, it is recorded in the table of established connections.
The established raw TCP connection is then [upgraded](#connection-upgrade) into
an authenticated secret connection.
The established secret connection (`conn.SecretConnection` type),
the information about the peer (`NodeInfo` record) retrieved and verified
during the version handshake,
as well any error returned in this process are added to a queue of accepted connections.
This queue is consumed by the `Accept` method.
> Handling accepted connection asynchronously was introduced due to this issue:
> https://github.com/tendermint/tendermint/issues/2047
## Connection Filtering
The `filterConn` method is invoked for every new raw connection established by the transport.
Its main goal is avoid the transport to maintain duplicated connections with the same peer.
It also runs a set of configured connection filters.
The transports keeps a table `conns` of established connections.
The table maps the remote address returned by a generic connection to a list of
IP addresses, to which the connection remote address is resolved.
If the remote address of the new connection is already present in the table,
the connection is rejected.
Otherwise, the connection's remote address is resolved into a list of IPs,
which are recorded in the established connections table.
The connection and the resolved IPs are then passed through a set of connection filters,
configured via the `MultiplexTransportConnFilters` transport option.
The maximum duration for the filters execution, which is performed in parallel,
is determined by `filterTimeout`.
Its default value is 5 seconds,
which can be changed using the `MultiplexTransportFilterTimeout` transport option.
If the connection and the resolved remote addresses are not filtered out,
the transport registers them into the `conns` table and returns.
In case of errors, the connection is removed from the table of established
connections and closed.
### Errors
If the address of the new connection is already present in the `conns` table,
an `ErrRejected` error with the `isDuplicate` reason is returned.
If the IP resolution of the connection's remote address fails,
an `AddrError` or `DNSError` error is returned.
If any of the filters reject the connection,
an `ErrRejected` error with the `isRejected` reason is returned.
If the filters execution times out,
an `ErrFilterTimeout` error is returned.
## Connection Upgrade
The `upgrade` method is invoked for every new raw connection established by the
transport that was not [filtered out](#connection-filtering).
It upgrades an established raw TCP connection into a secret authenticated
connection, and validates the information provided by the peer.
This is a complex procedure, that can be summarized by the following three
message exchanges between the node and the new peer:
1. Encryption: the nodes produce ephemeral key pairs and exchange ephemeral
public keys, from which are derived: (i) a pair of secret keys used to
encrypt the data exchanged between the nodes, and (ii) a challenge message.
1. Authentication: the nodes exchange their persistent public keys and a
signature of the challenge message produced with the their persistent
private keys. This allows validating the peer's persistent public key,
which plays the role of node ID.
1. Version handshake: nodes exchange and validate each other `NodeInfo` records.
This records contain, among other fields, their node IDs, the network/chain
ID they are part of, and the list of supported channel IDs.
Steps (1) and (2) are implemented in the `conn` package.
In case of success, they produce the secret connection that is actually used by
the node to communicate with the peer.
An overview of this procedure, which implements the station-to-station (STS)
[protocol][sts-paper] ([PDF][sts-paper-pdf]), can be found [here][peer-sts].
The maximum duration for establishing a secret connection with the peer is
defined by `handshakeTimeout`, hard-coded to 3 seconds.
The established secret connection stores the persistent public key of the peer,
which has been validated via the challenge authentication of step (2).
If the connection being upgraded is an outbound connection, i.e., if the node has
dialed the peer, the dialed peer's ID is compared to the peer's persistent public key:
if they do not match, the connection is rejected.
This verification is not performed in the case of inbound (accepted) connections,
as the node does not know a priori the remote node's ID.
Step (3), the version handshake, is performed by the transport.
Its maximum duration is also defined by `handshakeTimeout`, hard-coded to 3 seconds.
The version handshake retrieves the `NodeInfo` record of the new peer,
which can be rejected for multiple reasons, listed [here][peer-handshake].
If the connection upgrade succeeds, the method returns the established secret
connection, an instance of `conn.SecretConnection` type,
and the `NodeInfo` record of the peer.
In case of errors, the connection is removed from the table of established
connections and closed.
### Errors
The timeouts for steps (1) and (2), and for step (3), are configured as the
deadline for operations on the TCP connection that is being upgraded.
If this deadline it is reached, the connection produces an
`os.ErrDeadlineExceeded` error, returned by the corresponding step.
Any error produced when establishing a secret connection with the peer (steps 1 and 2) or
during the version handshake (step 3), including timeouts,
is encapsulated into an `ErrRejected` error with reason `isAuthFailure` and returned.
If the upgraded connection is an outbound connection, and the peer ID learned in step (2)
does not match the dialed peer's ID,
an `ErrRejected` error with reason `isAuthFailure` is returned.
If the peer's `NodeInfo` record, retrieved in step (3), is invalid,
or if reports a node ID that does not match peer ID learned in step (2),
an `ErrRejected` error with reason `isAuthFailure` is returned.
If it reports a node ID equals to the local node ID,
an `ErrRejected` error with reason `isSelf` is returned.
If it is not compatible with the local `NodeInfo`,
an `ErrRejected` error with reason `isIncompatible` is returned.
## Close
The `Close` method closes the TCP listener created by the `Listen` method,
and sends a signal for interrupting the `acceptPeers` routine.
This method is called when a node is [stopped](https://github.com/tendermint/tendermint/blob/46badfabd9d5491c78283a0ecdeb695e21785508/node/node.go#L1019).
## Cleanup
The `Cleanup` method receives a `Peer` instance,
and removes the connection established with a peer from the table of established connections.
It also invokes the `Peer` interface method to close the connection associated with a peer.
It is invoked when the connection with a peer is closed.
## Supported channels
The `AddChannel` method registers a channel in the transport.
The channel ID is added to the list of supported channel IDs,
stored in the local `NodeInfo` record.
The `NodeInfo` record is exchanged with peers in the version handshake.
For this reason, this method is not invoked with a started transport.
> The only call to this method is performed in the `CustomReactors` constructor
> option of a node, i.e., before the node is started.
> Note that the default list of supported channel IDs, including the default reactors,
> is provided to the transport as its original `NodeInfo` record.
[peer-sts]: https://github.com/tendermint/tendermint/blob/main/spec/p2p/peer.md#authenticated-encryption-handshake
[peer-handshake]:https://github.com/tendermint/tendermint/blob/main/spec/p2p/peer.md#tendermint-version-handshake
[sts-paper]: https://link.springer.com/article/10.1007/BF00124891
[sts-paper-pdf]: https://github.com/tendermint/tendermint/blob/0.1/docs/sts-final.pdf

View File

@@ -1,239 +0,0 @@
# Types adopted in the p2p implementation
This document lists the packages and source files, excluding test units, that
implement the p2p layer, and summarizes the main types they implement.
Types play the role of classes in Go.
The reference version for this documentation is the branch
[`v0.34.x`](https://github.com/tendermint/tendermint/tree/v0.34.x/p2p).
State of August 2022.
## Package `p2p`
Implementation of the p2p layer of Tendermint.
### `base_reactor.go`
`Reactor` interface.
`BaseReactor` implements `Reactor`.
**Not documented yet**.
### `conn_set.go`
`ConnSet` interface, a "lookup table for connections and their ips".
Internal type `connSet` implements the `ConnSet` interface.
Used by the [transport](#transportgo) to store connected peers.
### `errors.go`
Defines several error types.
`ErrRejected` enumerates a number of reason for which a peer was rejected.
Mainly produced by the [transport](#transportgo),
but also by the [switch](#switchgo).
`ErrSwitchDuplicatePeerID` is produced by the `PeerSet` used by the [switch](#switchgo).
`ErrSwitchConnectToSelf` is handled by the [switch](#switchgo),
but currently is not produced outside tests.
`ErrSwitchAuthenticationFailure` is handled by the [PEX reactor](#pex_reactorgo),
but currently is not produced outside tests.
`ErrTransportClosed` is produced by the [transport](#transportgo)
and handled by the [switch](#switchgo).
`ErrNetAddressNoID`, `ErrNetAddressInvalid`, and `ErrNetAddressLookup`
are parsing a string to create an instance of `NetAddress`.
It can be returned in the setup of the [switch](#switchgo)
and of the [PEX reactor](#pex_reactorgo),
as well when the [transport](#transportgo) validates a `NodeInfo`, as part of
the connection handshake.
`ErrCurrentlyDialingOrExistingAddress` is produced by the [switch](#switchgo),
and handled by the switch and the [PEX reactor](#pex_reactorgo).
### `fuzz.go`
For testing purposes.
`FuzzedConnection` wraps a `net.Conn` and injects random delays.
### `key.go`
`NodeKey` is the persistent key of a node, namely its private key.
The `ID` of a node is a string representing the node's public key.
### `metrics.go`
Prometheus `Metrics` exposed by the p2p layer.
### `netaddress.go`
Type `NetAddress` contains the `ID` and the network address (IP and port) of a node.
The API of the [address book](#addrbookgo) receives and returns `NetAddress` instances.
This source file was adapted from [`btcd`](https://github.com/btcsuite/btcd),
a Go implementation of Bitcoin.
### `node_info.go`
Interface `NodeInfo` stores the basic information about a node exchanged with a
peer during the handshake.
It is implemented by `DefaultNodeInfo` type.
The [switch](#switchgo) stores the local `NodeInfo`.
The `NodeInfo` of connected peers is produced by the
[transport](#transportgo) during the handshake, and stored in [`Peer`](#peergo) instances.
### `peer.go`
Interface `Peer` represents a connected peer.
It is implemented by the internal `peer` type.
The [transport](#transportgo) API methods return `Peer` instances,
wrapping established secure connection with peers.
The [switch](#switchgo) API methods receive `Peer` instances.
The switch stores connected peers in a `PeerSet`.
The [`Reactor`](#base_reactorgo) methods, invoked by the switch, receive `Peer` instances.
### `peer_set.go`
Interface `IPeerSet` offers methods to access a table of [`Peer`](#peergo) instances.
Type `PeerSet` implements a thread-safe table of [`Peer`](#peergo) instances,
used by the [switch](#switchgo).
The switch provides limited access to this table by returing a `IPeerSet`
instance, used by the [PEX reactor](#pex_reactorgo).
### `switch.go`
Documented in [switch](./switch.md).
The `Switch` implements the [peer manager](./peer_manager.md) role for inbound peers.
[`Reactor`](#base_reactorgo)s have access to the `Switch` and may invoke its methods.
This includes the [PEX reactor](#pex_reactorgo).
### `transport.go`
Documented in [transport](./transport.md).
The `Transport` interface is implemented by `MultiplexTransport`.
The [switch](#switchgo) contains a `Transport` and uses it to establish
connections with peers.
### `types.go`
Aliases for p2p's `conn` package types.
## Package `p2p.conn`
Implements the connection between Tendermint nodes,
which is encrypted, authenticated, and multiplexed.
### `connection.go`
Implements the `MConnection` type and the `Channel` abstraction.
A `MConnection` multiplexes a generic network connection (`net.Conn`) into
multiple independent `Channel`s, used by different [`Reactor`](#base_reactorgo)s.
A [`Peer`](#peergo) stores the `MConnection` instance used to interact with a
peer, which multiplex a [`SecretConnection`](#secret_connectiongo).
### `conn_go110.go`
Support for go 1.10.
### `secret_connection.go`
Implements the `SecretConnection` type, which is an encrypted authenticated
connection built atop a raw network (TCP) connection.
A [`Peer`](#peergo) stores the `SecretConnection` established by the transport,
which is the underlying connection multiplexed by [`MConnection`](#connectiongo).
As briefly documented in the [transport](./transport.md#Connection-Upgrade),
a `SecretConnection` implements the Station-To-Station (STS) protocol.
The `SecretConnection` type implements the `net.Conn` interface,
which is a generic network connection.
## Package `p2p.mock`
Mock implementations of [`Peer`](#peergo) and [`Reactor`](#base_reactorgo) interfaces.
## Package `p2p.mocks`
Code generated by `mockery`.
## Package `p2p.pex`
Implementation of the [PEX reactor](./pex.md).
### `addrbook.go`
Documented in [address book](./addressbook.md).
This source file was adapted from [`btcd`](https://github.com/btcsuite/btcd),
a Go implementation of Bitcoin.
### `errors.go`
A number of errors produced and handled by the [address book](#addrbookgo).
`ErrAddrBookNilAddr` is produced by the address book, but handled (logged) by
the [PEX reactor](#pex_reactorgo).
`ErrUnsolicitedList` is produced and handled by the [PEX protocol](#pex_reactorgo).
### `file.go`
Implements the [address book](#addrbookgo) persistence.
### `known_address.go`
Type `knownAddress` represents an address stored in the [address book](#addrbookgo).
### `params.go`
Constants used by the [address book](#addrbookgo).
### `pex_reactor.go`
Implementation of the [PEX reactor](./pex.md), which is a [`Reactor`](#base_reactorgo).
This includes the implementation of the [PEX protocol](./pex-protocol.md)
and of the [peer manager](./peer_manager.md) role for outbound peers.
The PEX reactor also manages an [address book](#addrbookgo) instance.
## Package `p2p.trust`
Go documentation of `Metric` type:
> // Metric - keeps track of peer reliability
> // See tendermint/docs/architecture/adr-006-trust-metric.md for details
Not imported by any other Tendermint source file.
## Package `p2p.upnp`
This package implementation was taken from "taipei-torrent".
It is used by the `probe-upnp` command of the Tendermint binary.

View File

@@ -81,6 +81,9 @@ type State struct {
// Copy makes a copy of the State for mutating.
func (state State) Copy() State {
if state.IsEmpty() {
return State{}
}
return State{
Version: state.Version,

30
statesync/metrics.gen.go Normal file
View File

@@ -0,0 +1,30 @@
// Code generated by metricsgen. DO NOT EDIT.
package statesync
import (
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "syncing",
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
}
}
func NopMetrics() *Metrics {
return &Metrics{
Syncing: discard.NewGauge(),
}
}

19
statesync/metrics.go Normal file
View File

@@ -0,0 +1,19 @@
package statesync
import (
"github.com/go-kit/kit/metrics"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "statesync"
)
//go:generate go run ../scripts/metricsgen -struct=Metrics
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Whether or not a node is state syncing. 1 if yes, 0 if no.
Syncing metrics.Gauge
}

View File

@@ -33,6 +33,7 @@ type Reactor struct {
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
tempDir string
metrics *Metrics
// This will only be set when a state sync is in progress. It is used to feed received
// snapshots and chunks into the sync.
@@ -46,12 +47,14 @@ func NewReactor(
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
tempDir string,
metrics *Metrics,
) *Reactor {
r := &Reactor{
cfg: cfg,
conn: conn,
connQuery: connQuery,
metrics: metrics,
}
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)
@@ -92,6 +95,13 @@ func (r *Reactor) AddPeer(peer p2p.Peer) {
}
}
// IsSyncing returns true if state sync is actively being used to restore a state
func (r *Reactor) IsSyncing() bool {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.syncer != nil
}
// RemovePeer implements p2p.Reactor.
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
r.mtx.RLock()
@@ -265,6 +275,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.mtx.Unlock()
return sm.State{}, nil, errors.New("a state sync is already in progress")
}
r.metrics.Syncing.Add(1)
r.syncer = newSyncer(r.cfg, r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
r.mtx.Unlock()
@@ -284,6 +295,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.mtx.Lock()
r.syncer = nil
r.metrics.Syncing.Add(0)
r.mtx.Unlock()
return state, commit, err
}

View File

@@ -71,7 +71,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
// Start a reactor and send a ssproto.ChunkRequest, then wait for and check response
cfg := config.DefaultStateSyncConfig()
r := NewReactor(*cfg, conn, nil, "")
r := NewReactor(*cfg, conn, nil, "", NopMetrics())
err := r.Start()
require.NoError(t, err)
t.Cleanup(func() {
@@ -161,7 +161,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
// Start a reactor and send a SnapshotsRequestMessage, then wait for and check responses
cfg := config.DefaultStateSyncConfig()
r := NewReactor(*cfg, conn, nil, "")
r := NewReactor(*cfg, conn, nil, "", NopMetrics())
err := r.Start()
require.NoError(t, err)
t.Cleanup(func() {

View File

@@ -21,6 +21,7 @@ const appVersion = 1
// Application is an ABCI application for use by end-to-end tests. It is a
// simple key/value store for strings, storing data in memory and persisting
// to disk as JSON, taking state sync snapshots if requested.
type Application struct {
abci.BaseApplication
logger log.Logger
@@ -88,7 +89,7 @@ func DefaultConfig(dir string) *Config {
}
// NewApplication creates the application.
func NewApplication(cfg *Config) (abci.Application, error) {
func NewApplication(cfg *Config) (*Application, error) {
state, err := NewState(cfg.Dir, cfg.PersistInterval)
if err != nil {
return nil, err
@@ -270,8 +271,7 @@ func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) a
}
func (app *Application) PrepareProposal(
req abci.RequestPrepareProposal,
) abci.ResponsePrepareProposal {
req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
txs := make([][]byte, 0, len(req.Txs))
var totalBytes int64
for _, tx := range req.Txs {

View File

@@ -1,111 +0,0 @@
package app
import (
"sync"
abci "github.com/tendermint/tendermint/abci/types"
)
// SyncApplication wraps an Application, managing its own synchronization. This
// allows it to be called from an unsynchronized local client, as it is
// implemented in a thread-safe way.
type SyncApplication struct {
mtx sync.RWMutex
app *Application
}
var _ abci.Application = (*SyncApplication)(nil)
func NewSyncApplication(cfg *Config) (abci.Application, error) {
app, err := NewApplication(cfg)
if err != nil {
return nil, err
}
return &SyncApplication{
app: app.(*Application),
}, nil
}
func (app *SyncApplication) Info(req abci.RequestInfo) abci.ResponseInfo {
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.Info(req)
}
func (app *SyncApplication) InitChain(req abci.RequestInitChain) abci.ResponseInitChain {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.InitChain(req)
}
func (app *SyncApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.CheckTx(req)
}
func (app *SyncApplication) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
// app.app.PrepareProposal does not modify state
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.PrepareProposal(req)
}
func (app *SyncApplication) ProcessProposal(req abci.RequestProcessProposal) abci.ResponseProcessProposal {
// app.app.ProcessProposal does not modify state
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.ProcessProposal(req)
}
func (app *SyncApplication) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.DeliverTx(req)
}
func (app *SyncApplication) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.BeginBlock(req)
}
func (app *SyncApplication) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.EndBlock(req)
}
func (app *SyncApplication) Commit() abci.ResponseCommit {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.Commit()
}
func (app *SyncApplication) Query(req abci.RequestQuery) abci.ResponseQuery {
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.Query(req)
}
func (app *SyncApplication) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.ApplySnapshotChunk(req)
}
func (app *SyncApplication) ListSnapshots(req abci.RequestListSnapshots) abci.ResponseListSnapshots {
// Calls app.snapshots.List(), which is thread-safe.
return app.app.ListSnapshots(req)
}
func (app *SyncApplication) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk {
// Calls app.snapshots.LoadChunk, which is thread-safe.
return app.app.LoadSnapshotChunk(req)
}
func (app *SyncApplication) OfferSnapshot(req abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.OfferSnapshot(req)
}

View File

@@ -105,7 +105,7 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
// First we generate seed nodes, starting at the initial height.
for i := 1; i <= numSeeds; i++ {
manifest.Nodes[fmt.Sprintf("seed%02d", i)] = generateNode(
r, e2e.ModeSeed, false, 0, manifest.InitialHeight, false)
r, e2e.ModeSeed, 0, manifest.InitialHeight, false)
}
// Next, we generate validators. We make sure a BFT quorum of validators start
@@ -120,12 +120,8 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
nextStartAt += 5
}
name := fmt.Sprintf("validator%02d", i)
syncApp := false
if manifest.ABCIProtocol == string(e2e.ProtocolBuiltin) {
syncApp = r.Intn(100) >= 50
}
manifest.Nodes[name] = generateNode(
r, e2e.ModeValidator, syncApp, startAt, manifest.InitialHeight, i <= 2)
r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2)
if startAt == 0 {
(*manifest.Validators)[name] = int64(30 + r.Intn(71))
@@ -153,12 +149,8 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
startAt = nextStartAt
nextStartAt += 5
}
syncApp := false
if manifest.ABCIProtocol == string(e2e.ProtocolBuiltin) {
syncApp = r.Intn(100) >= 50
}
manifest.Nodes[fmt.Sprintf("full%02d", i)] = generateNode(
r, e2e.ModeFull, syncApp, startAt, manifest.InitialHeight, false)
r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
}
// We now set up peer discovery for nodes. Seed nodes are fully meshed with
@@ -221,11 +213,10 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
// here, since we need to know the overall network topology and startup
// sequencing.
func generateNode(
r *rand.Rand, mode e2e.Mode, syncApp bool, startAt int64, initialHeight int64, forceArchive bool,
r *rand.Rand, mode e2e.Mode, startAt int64, initialHeight int64, forceArchive bool,
) *e2e.ManifestNode {
node := e2e.ManifestNode{
Mode: string(mode),
SyncApp: syncApp,
StartAt: startAt,
Database: nodeDatabases.Choose(r).(string),
PrivvalProtocol: nodePrivvalProtocols.Choose(r).(string),

View File

@@ -60,7 +60,6 @@ perturb = ["kill"]
persistent_peers = ["validator01"]
database = "rocksdb"
abci_protocol = "builtin"
sync_app = true
perturb = ["pause"]
[node.validator05]

View File

@@ -7,17 +7,15 @@ import (
"github.com/BurntSushi/toml"
"github.com/tendermint/tendermint/test/e2e/app"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
// Config is the application configuration.
type Config struct {
ChainID string `toml:"chain_id"`
Listen string `toml:"listen"`
Protocol string `toml:"protocol"`
Dir string `toml:"dir"`
ChainID string `toml:"chain_id"`
Listen string
Protocol string
Dir string
Mode string `toml:"mode"`
SyncApp bool `toml:"sync_app"`
PersistInterval uint64 `toml:"persist_interval"`
SnapshotInterval uint64 `toml:"snapshot_interval"`
RetainBlocks uint64 `toml:"retain_blocks"`
@@ -64,10 +62,6 @@ func (cfg Config) Validate() error {
return errors.New("chain_id parameter is required")
case cfg.Listen == "" && cfg.Protocol != "builtin":
return errors.New("listen parameter is required")
case cfg.SyncApp && cfg.Protocol != string(e2e.ProtocolBuiltin):
return errors.New("sync_app parameter is only relevant for builtin applications")
case cfg.SyncApp && cfg.Mode != string(e2e.ModeFull) && cfg.Mode != string(e2e.ModeValidator):
return errors.New("sync_app parameter is only relevant to full nodes and validators")
default:
return nil
}

View File

@@ -113,22 +113,9 @@ func startApp(cfg *Config) error {
//
// FIXME There is no way to simply load the configuration from a file, so we need to pull in Viper.
func startNode(cfg *Config) error {
var cc proxy.ClientCreator
if cfg.SyncApp {
app, err := app.NewSyncApplication(cfg.App())
if err != nil {
return err
}
cc = proxy.NewUnsyncLocalClientCreator(app)
logger.Info("Using synchronized app with unsynchronized local client")
} else {
app, err := app.NewApplication(cfg.App())
if err != nil {
return err
}
cc = proxy.NewLocalClientCreator(app)
logger.Info("Using regular app with synchronized (regular) local client")
app, err := app.NewApplication(cfg.App())
if err != nil {
return err
}
tmcfg, nodeLogger, nodeKey, err := setupNode()
@@ -139,7 +126,7 @@ func startNode(cfg *Config) error {
n, err := node.NewNode(tmcfg,
privval.LoadOrGenFilePV(tmcfg.PrivValidatorKeyFile(), tmcfg.PrivValidatorStateFile()),
nodeKey,
cc,
proxy.NewLocalClientCreator(app),
node.DefaultGenesisDocProviderFunc(tmcfg),
node.DefaultDBProvider,
node.DefaultMetricsProvider(tmcfg.Instrumentation),

View File

@@ -77,15 +77,6 @@ type ManifestNode struct {
// is generated), and seed nodes run in seed mode with the PEX reactor enabled.
Mode string `toml:"mode"`
// SyncApp specifies whether this node should use a synchronized application
// with an unsynchronized local client. By default this is `false`, meaning
// that the node will run an unsynchronized application with a synchronized
// local client.
//
// Only applies to validators and full nodes where their ABCI protocol is
// "builtin".
SyncApp bool `toml:"sync_app"`
// Seeds is the list of node names to use as P2P seed nodes. Defaults to none.
Seeds []string `toml:"seeds"`

View File

@@ -74,7 +74,6 @@ type Node struct {
Name string
Testnet *Testnet
Mode Mode
SyncApp bool // Should we use a synchronized app with an unsynchronized local client?
PrivvalKey crypto.PrivKey
NodeKey crypto.PrivKey
IP net.IP
@@ -155,7 +154,6 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
IP: ind.IPAddress,
ProxyPort: proxyPortGen.Next(),
Mode: ModeValidator,
SyncApp: nodeManifest.SyncApp,
Database: "goleveldb",
ABCIProtocol: Protocol(testnet.ABCIProtocol),
PrivvalProtocol: ProtocolFile,

View File

@@ -258,7 +258,6 @@ func MakeAppConfig(node *e2e.Node) ([]byte, error) {
"dir": "data/app",
"listen": AppAddressUNIX,
"mode": node.Mode,
"sync_app": node.SyncApp,
"proxy_port": node.ProxyPort,
"protocol": "socket",
"persist_interval": node.PersistInterval,

View File

@@ -21,7 +21,7 @@ func FuzzRPCJSONRPCServer(f *testing.F) {
I int `json:"i"`
}
var rpcFuncMap = map[string]*rpcserver.RPCFunc{
"c": rpcserver.NewRPCFunc(func(ctx *rpctypes.Context, args *args, options ...rpcserver.Option) (string, error) {
"c": rpcserver.NewRPCFunc(func(ctx *rpctypes.Context, args *args) (string, error) {
return "foo", nil
}, "args"),
}

View File

@@ -101,7 +101,7 @@ func (voteSet *VoteSet) ChainID() string {
// Implements VoteSetReader.
func (voteSet *VoteSet) GetHeight() int64 {
if voteSet == nil {
if voteSet.IsEmpty() {
return 0
}
return voteSet.height
@@ -109,7 +109,7 @@ func (voteSet *VoteSet) GetHeight() int64 {
// Implements VoteSetReader.
func (voteSet *VoteSet) GetRound() int32 {
if voteSet == nil {
if voteSet.IsEmpty() {
return -1
}
return voteSet.round
@@ -117,7 +117,7 @@ func (voteSet *VoteSet) GetRound() int32 {
// Implements VoteSetReader.
func (voteSet *VoteSet) Type() byte {
if voteSet == nil {
if voteSet.IsEmpty() {
return 0x00
}
return byte(voteSet.signedMsgType)
@@ -125,12 +125,16 @@ func (voteSet *VoteSet) Type() byte {
// Implements VoteSetReader.
func (voteSet *VoteSet) Size() int {
if voteSet == nil {
if voteSet.IsEmpty() {
return 0
}
return voteSet.valSet.Size()
}
func (voteSet *VoteSet) IsEmpty() bool {
return voteSet == nil || voteSet.height == 0
}
// Returns added=true if vote is valid and new.
// Otherwise returns err=ErrVote[
//