diff --git a/internal/statesync/chunks.go b/internal/statesync/chunks.go index 590f128da..84b6971b8 100644 --- a/internal/statesync/chunks.go +++ b/internal/statesync/chunks.go @@ -355,3 +355,16 @@ func (q *chunkQueue) WaitFor(index uint32) <-chan uint32 { return ch } + +func (q *chunkQueue) numChunksReturned() int { + q.Lock() + defer q.Unlock() + + cnt := 0 + for _, b := range q.chunkReturned { + if b { + cnt++ + } + } + return cnt +} diff --git a/internal/statesync/chunks_test.go b/internal/statesync/chunks_test.go index ad7f19b3b..e17c170bd 100644 --- a/internal/statesync/chunks_test.go +++ b/internal/statesync/chunks_test.go @@ -421,15 +421,7 @@ func TestChunkQueue_Retry(t *testing.T) { queue, teardown := setupChunkQueue(t) defer teardown() - // Allocate and add all chunks to the queue - for i := uint32(0); i < queue.Size(); i++ { - _, err := queue.Allocate() - require.NoError(t, err) - _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: i, Chunk: []byte{byte(i)}}) - require.NoError(t, err) - _, err = queue.Next() - require.NoError(t, err) - } + allocateAddChunksToQueue(t, queue) // Retrying a couple of chunks makes Next() return them, but they are not allocatable queue.Retry(3) @@ -454,15 +446,7 @@ func TestChunkQueue_RetryAll(t *testing.T) { queue, teardown := setupChunkQueue(t) defer teardown() - // Allocate and add all chunks to the queue - for i := uint32(0); i < queue.Size(); i++ { - _, err := queue.Allocate() - require.NoError(t, err) - _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: i, Chunk: []byte{byte(i)}}) - require.NoError(t, err) - _, err = queue.Next() - require.NoError(t, err) - } + allocateAddChunksToQueue(t, queue) _, err := queue.Next() assert.Equal(t, errDone, err) @@ -552,3 +536,29 @@ func TestChunkQueue_WaitFor(t *testing.T) { _, ok = <-w assert.False(t, ok) } + +func TestNumChunkReturned(t *testing.T) { + queue, teardown := setupChunkQueue(t) + defer teardown() + + assert.EqualValues(t, 5, queue.Size()) + + allocateAddChunksToQueue(t, queue) + assert.EqualValues(t, 5, queue.numChunksReturned()) + + err := queue.Close() + require.NoError(t, err) +} + +// Allocate and add all chunks to the queue +func allocateAddChunksToQueue(t *testing.T, q *chunkQueue) { + t.Helper() + for i := uint32(0); i < q.Size(); i++ { + _, err := q.Allocate() + require.NoError(t, err) + _, err = q.Add(&chunk{Height: 3, Format: 1, Index: i, Chunk: []byte{byte(i)}}) + require.NoError(t, err) + _, err = q.Next() + require.NoError(t, err) + } +} diff --git a/internal/statesync/metrics.go b/internal/statesync/metrics.go new file mode 100644 index 000000000..fb134f580 --- /dev/null +++ b/internal/statesync/metrics.go @@ -0,0 +1,91 @@ +package statesync + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this package. + MetricsSubsystem = "statesync" +) + +// Metrics contains metrics exposed by this package. +type Metrics struct { + TotalSnapshots metrics.Counter + ChunkProcessAvgTime metrics.Gauge + SnapshotHeight metrics.Gauge + SnapshotChunk metrics.Counter + SnapshotChunkTotal metrics.Gauge + BackFilledBlocks metrics.Counter + BackFillBlocksTotal metrics.Gauge +} + +// PrometheusMetrics returns Metrics build using Prometheus client library. +// Optionally, labels can be provided along with their values ("foo", +// "fooValue"). +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + TotalSnapshots: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "total_snapshots", + Help: "The total number of snapshots discovered.", + }, labels).With(labelsAndValues...), + ChunkProcessAvgTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "chunk_process_avg_time", + Help: "The average processing time per chunk.", + }, labels).With(labelsAndValues...), + SnapshotHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "snapshot_height", + Help: "The height of the current snapshot the has been processed.", + }, labels).With(labelsAndValues...), + SnapshotChunk: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "snapshot_chunk", + Help: "The current number of chunks that have been processed.", + }, labels).With(labelsAndValues...), + SnapshotChunkTotal: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "snapshot_chunks_total", + Help: "The total number of chunks in the current snapshot.", + }, labels).With(labelsAndValues...), + BackFilledBlocks: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "backfilled_blocks", + Help: "The current number of blocks that have been back-filled.", + }, labels).With(labelsAndValues...), + BackFillBlocksTotal: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "backfilled_blocks_total", + Help: "The total number of blocks that need to be back-filled.", + }, labels).With(labelsAndValues...), + } +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() *Metrics { + return &Metrics{ + TotalSnapshots: discard.NewCounter(), + ChunkProcessAvgTime: discard.NewGauge(), + SnapshotHeight: discard.NewGauge(), + SnapshotChunk: discard.NewCounter(), + SnapshotChunkTotal: discard.NewGauge(), + BackFilledBlocks: discard.NewCounter(), + BackFillBlocksTotal: discard.NewGauge(), + } +} diff --git a/internal/statesync/mocks/Metricer.go b/internal/statesync/mocks/Metricer.go new file mode 100644 index 000000000..c4721b304 --- /dev/null +++ b/internal/statesync/mocks/Metricer.go @@ -0,0 +1,112 @@ +// Code generated by mockery 2.9.4. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// Metricer is an autogenerated mock type for the Metricer type +type Metricer struct { + mock.Mock +} + +// BackFillBlocksTotal provides a mock function with given fields: +func (_m *Metricer) BackFillBlocksTotal() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// BackFilledBlocks provides a mock function with given fields: +func (_m *Metricer) BackFilledBlocks() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// ChunkProcessAvgTime provides a mock function with given fields: +func (_m *Metricer) ChunkProcessAvgTime() time.Duration { + ret := _m.Called() + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// SnapshotChunksCount provides a mock function with given fields: +func (_m *Metricer) SnapshotChunksCount() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// SnapshotChunksTotal provides a mock function with given fields: +func (_m *Metricer) SnapshotChunksTotal() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// SnapshotHeight provides a mock function with given fields: +func (_m *Metricer) SnapshotHeight() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// TotalSnapshots provides a mock function with given fields: +func (_m *Metricer) TotalSnapshots() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 18c6acc2f..c2f598a8c 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -125,6 +125,18 @@ const ( maxLightBlockRequestRetries = 20 ) +// Metricer defines an interface used for the rpc sync info query, please see statesync.metrics +// for the details. +type Metricer interface { + TotalSnapshots() int64 + ChunkProcessAvgTime() time.Duration + SnapshotHeight() int64 + SnapshotChunksCount() int64 + SnapshotChunksTotal() int64 + BackFilledBlocks() int64 + BackFillBlocksTotal() int64 +} + // Reactor handles state sync, both restoring snapshots for the local node and // serving snapshots for other nodes. type Reactor struct { @@ -158,6 +170,10 @@ type Reactor struct { syncer *syncer providers map[types.NodeID]*BlockProvider stateProvider StateProvider + + metrics *Metrics + backfillBlockTotal int64 + backfilledBlocks int64 } // NewReactor returns a reference to a new state sync reactor, which implements @@ -176,6 +192,7 @@ func NewReactor( stateStore sm.Store, blockStore *store.BlockStore, tempDir string, + ssMetrics *Metrics, ) *Reactor { r := &Reactor{ chainID: chainID, @@ -195,6 +212,7 @@ func NewReactor( peers: newPeerList(), dispatcher: NewDispatcher(blockCh.Out), providers: make(map[types.NodeID]*BlockProvider), + metrics: ssMetrics, } r.BaseService = *service.NewBaseService(logger, "StateSync", r) @@ -271,6 +289,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { r.snapshotCh.Out, r.chunkCh.Out, r.tempDir, + r.metrics, ) r.mtx.Unlock() defer func() { @@ -347,6 +366,9 @@ func (r *Reactor) backfill( r.Logger.Info("starting backfill process...", "startHeight", startHeight, "stopHeight", stopHeight, "stopTime", stopTime, "trustedBlockID", trustedBlockID) + r.backfillBlockTotal = startHeight - stopHeight + 1 + r.metrics.BackFillBlocksTotal.Set(float64(r.backfillBlockTotal)) + const sleepTime = 1 * time.Second var ( lastValidatorSet *types.ValidatorSet @@ -481,6 +503,16 @@ func (r *Reactor) backfill( lastValidatorSet = resp.block.ValidatorSet + r.backfilledBlocks++ + r.metrics.BackFilledBlocks.Add(1) + + // The block height might be less than the stopHeight because of the stopTime condition + // hasn't been fulfilled. + if resp.block.Height < stopHeight { + r.backfillBlockTotal++ + r.metrics.BackFillBlocksTotal.Set(float64(r.backfillBlockTotal)) + } + case <-queue.done(): if err := queue.error(); err != nil { return err @@ -1005,3 +1037,66 @@ func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initial } return nil } + +func (r *Reactor) TotalSnapshots() int64 { + r.mtx.RLock() + defer r.mtx.RUnlock() + + if r.syncer != nil && r.syncer.snapshots != nil { + return int64(len(r.syncer.snapshots.snapshots)) + } + return 0 +} + +func (r *Reactor) ChunkProcessAvgTime() time.Duration { + r.mtx.RLock() + defer r.mtx.RUnlock() + + if r.syncer != nil { + return time.Duration(r.syncer.avgChunkTime) + } + return time.Duration(0) +} + +func (r *Reactor) SnapshotHeight() int64 { + r.mtx.RLock() + defer r.mtx.RUnlock() + + if r.syncer != nil { + return r.syncer.lastSyncedSnapshotHeight + } + return 0 +} +func (r *Reactor) SnapshotChunksCount() int64 { + r.mtx.RLock() + defer r.mtx.RUnlock() + + if r.syncer != nil && r.syncer.chunks != nil { + return int64(r.syncer.chunks.numChunksReturned()) + } + return 0 +} + +func (r *Reactor) SnapshotChunksTotal() int64 { + r.mtx.RLock() + defer r.mtx.RUnlock() + + if r.syncer != nil && r.syncer.processingSnapshot != nil { + return int64(r.syncer.processingSnapshot.Chunks) + } + return 0 +} + +func (r *Reactor) BackFilledBlocks() int64 { + r.mtx.RLock() + defer r.mtx.RUnlock() + + return r.backfilledBlocks +} + +func (r *Reactor) BackFillBlocksTotal() int64 { + r.mtx.RLock() + defer r.mtx.RUnlock() + + return r.backfillBlockTotal +} diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index e2f2c8225..6ab41b36c 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -29,6 +29,10 @@ import ( "github.com/tendermint/tendermint/types" ) +var ( + m = PrometheusMetrics(config.TestConfig().Instrumentation.Namespace) +) + type reactorTestSuite struct { reactor *Reactor syncer *syncer @@ -156,6 +160,7 @@ func setup( rts.stateStore, rts.blockStore, "", + m, ) rts.syncer = newSyncer( @@ -167,6 +172,7 @@ func setup( rts.snapshotOutCh, rts.chunkOutCh, "", + rts.reactor.metrics, ) require.NoError(t, rts.reactor.Start()) @@ -596,6 +602,9 @@ func TestReactor_Backfill(t *testing.T) { ) if failureRate > 3 { require.Error(t, err) + + require.NotEqual(t, rts.reactor.backfilledBlocks, rts.reactor.backfillBlockTotal) + require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal) } else { require.NoError(t, err) @@ -606,7 +615,12 @@ func TestReactor_Backfill(t *testing.T) { require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1)) require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1)) + + require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfilledBlocks) + require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal) } + require.Equal(t, rts.reactor.backfilledBlocks, rts.reactor.BackFilledBlocks()) + require.Equal(t, rts.reactor.backfillBlockTotal, rts.reactor.BackFillBlocksTotal()) }) } } diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 3591cb6b5..05cfffe64 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -63,8 +63,13 @@ type syncer struct { fetchers int32 retryTimeout time.Duration - mtx tmsync.RWMutex - chunks *chunkQueue + mtx tmsync.RWMutex + chunks *chunkQueue + metrics *Metrics + + avgChunkTime int64 + lastSyncedSnapshotHeight int64 + processingSnapshot *snapshot } // newSyncer creates a new syncer. @@ -76,6 +81,7 @@ func newSyncer( stateProvider StateProvider, snapshotCh, chunkCh chan<- p2p.Envelope, tempDir string, + metrics *Metrics, ) *syncer { return &syncer{ logger: logger, @@ -88,6 +94,7 @@ func newSyncer( tempDir: tempDir, fetchers: cfg.Fetchers, retryTimeout: cfg.ChunkRequestTimeout, + metrics: metrics, } } @@ -121,6 +128,7 @@ func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, err return false, err } if added { + s.metrics.TotalSnapshots.Add(1) s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format, "hash", snapshot.Hash) } @@ -190,9 +198,14 @@ func (s *syncer) SyncAny( defer chunks.Close() // in case we forget to close it elsewhere } + s.processingSnapshot = snapshot + s.metrics.SnapshotChunkTotal.Set(float64(snapshot.Chunks)) + newState, commit, err := s.Sync(ctx, snapshot, chunks) switch { case err == nil: + s.metrics.SnapshotHeight.Set(float64(snapshot.Height)) + s.lastSyncedSnapshotHeight = int64(snapshot.Height) return newState, commit, nil case errors.Is(err, errAbort): @@ -237,6 +250,7 @@ func (s *syncer) SyncAny( } snapshot = nil chunks = nil + s.processingSnapshot = nil } } @@ -286,6 +300,7 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context canceled. fetchCtx, cancel := context.WithCancel(ctx) defer cancel() + fetchStartTime := time.Now() for i := int32(0); i < s.fetchers; i++ { go s.fetchChunks(fetchCtx, snapshot, chunks) } @@ -324,7 +339,7 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu } // Restore snapshot - err = s.applyChunks(ctx, chunks) + err = s.applyChunks(ctx, chunks, fetchStartTime) if err != nil { return sm.State{}, nil, err } @@ -381,7 +396,7 @@ func (s *syncer) offerSnapshot(ctx context.Context, snapshot *snapshot) error { // applyChunks applies chunks to the app. It returns various errors depending on the app's // response, or nil once the snapshot is fully restored. -func (s *syncer) applyChunks(ctx context.Context, chunks *chunkQueue) error { +func (s *syncer) applyChunks(ctx context.Context, chunks *chunkQueue, start time.Time) error { for { chunk, err := chunks.Next() if err == errDone { @@ -423,6 +438,9 @@ func (s *syncer) applyChunks(ctx context.Context, chunks *chunkQueue) error { switch resp.Result { case abci.ResponseApplySnapshotChunk_ACCEPT: + s.metrics.SnapshotChunk.Add(1) + s.avgChunkTime = time.Since(start).Nanoseconds() / int64(chunks.numChunksReturned()) + s.metrics.ChunkProcessAvgTime.Set(float64(s.avgChunkTime)) case abci.ResponseApplySnapshotChunk_ABORT: return errAbort case abci.ResponseApplySnapshotChunk_RETRY: diff --git a/internal/statesync/syncer_test.go b/internal/statesync/syncer_test.go index d965c20be..867c99b0f 100644 --- a/internal/statesync/syncer_test.go +++ b/internal/statesync/syncer_test.go @@ -70,6 +70,8 @@ func TestSyncer_SyncAny(t *testing.T) { peerCID := types.NodeID("cc") rts := setup(t, connSnapshot, connQuery, stateProvider, 3) + rts.reactor.syncer = rts.syncer + // Adding a chunk should error when no sync is in progress _, err := rts.syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}}) require.Error(t, err) @@ -195,6 +197,16 @@ func TestSyncer_SyncAny(t *testing.T) { require.Equal(t, expectState, newState) require.Equal(t, commit, lastCommit) + require.Equal(t, len(chunks), int(rts.syncer.processingSnapshot.Chunks)) + require.Equal(t, expectState.LastBlockHeight, rts.syncer.lastSyncedSnapshotHeight) + require.True(t, rts.syncer.avgChunkTime > 0) + + require.Equal(t, int64(rts.syncer.processingSnapshot.Chunks), rts.reactor.SnapshotChunksTotal()) + require.Equal(t, rts.syncer.lastSyncedSnapshotHeight, rts.reactor.SnapshotHeight()) + require.Equal(t, time.Duration(rts.syncer.avgChunkTime), rts.reactor.ChunkProcessAvgTime()) + require.Equal(t, int64(len(rts.syncer.snapshots.snapshots)), rts.reactor.TotalSnapshots()) + require.Equal(t, int64(0), rts.reactor.SnapshotChunksCount()) + connSnapshot.AssertExpectations(t) connQuery.AssertExpectations(t) } @@ -448,6 +460,9 @@ func TestSyncer_applyChunks_Results(t *testing.T) { body := []byte{1, 2, 3} chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "") require.NoError(t, err) + + fetchStartTime := time.Now() + _, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body}) require.NoError(t, err) @@ -461,7 +476,7 @@ func TestSyncer_applyChunks_Results(t *testing.T) { Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) } - err = rts.syncer.applyChunks(ctx, chunks) + err = rts.syncer.applyChunks(ctx, chunks, fetchStartTime) if tc.expectErr == unknownErr { require.Error(t, err) } else { @@ -498,6 +513,9 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) { chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "") require.NoError(t, err) + + fetchStartTime := time.Now() + added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}}) require.True(t, added) require.NoError(t, err) @@ -526,7 +544,7 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) { // check the queue contents, and finally close the queue to end the goroutine. // We don't really care about the result of applyChunks, since it has separate test. go func() { - rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error + rts.syncer.applyChunks(ctx, chunks, fetchStartTime) //nolint:errcheck // purposefully ignore error }() time.Sleep(50 * time.Millisecond) @@ -588,6 +606,8 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { chunks, err := newChunkQueue(s1, "") require.NoError(t, err) + fetchStartTime := time.Now() + added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerAID}) require.True(t, added) require.NoError(t, err) @@ -625,7 +645,7 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { // However, it will block on e.g. retry result, so we spawn a goroutine that will // be shut down when the chunk queue closes. go func() { - rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error + rts.syncer.applyChunks(ctx, chunks, fetchStartTime) //nolint:errcheck // purposefully ignore error }() time.Sleep(50 * time.Millisecond) diff --git a/node/node.go b/node/node.go index 6ec28515d..fa75a432d 100644 --- a/node/node.go +++ b/node/node.go @@ -240,16 +240,17 @@ func makeNode(config *cfg.Config, return nil, fmt.Errorf("failed to create peer manager: %w", err) } - csMetrics, p2pMetrics, memplMetrics, smMetrics := defaultMetricsProvider(config.Instrumentation)(genDoc.ChainID) + nodeMetrics := + defaultMetricsProvider(config.Instrumentation)(genDoc.ChainID) - router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, + router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey, peerManager, transport, getRouterConfig(config, proxyApp)) if err != nil { return nil, fmt.Errorf("failed to create router: %w", err) } mpReactorShim, mpReactor, mp, err := createMempoolReactor( - config, proxyApp, state, memplMetrics, peerManager, router, logger, + config, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger, ) if err != nil { return nil, err @@ -270,12 +271,12 @@ func makeNode(config *cfg.Config, mp, evPool, blockStore, - sm.BlockExecutorWithMetrics(smMetrics), + sm.BlockExecutorWithMetrics(nodeMetrics.state), ) csReactorShim, csReactor, csState := createConsensusReactor( config, state, blockExec, blockStore, mp, evPool, - privValidator, csMetrics, stateSync || blockSync, eventBus, + privValidator, nodeMetrics.cs, stateSync || blockSync, eventBus, peerManager, router, consensusLogger, ) @@ -283,7 +284,7 @@ func makeNode(config *cfg.Config, // doing a state sync first. bcReactorShim, bcReactor, err := createBlockchainReactor( logger, config, state, blockExec, blockStore, csReactor, - peerManager, router, blockSync && !stateSync, csMetrics, + peerManager, router, blockSync && !stateSync, nodeMetrics.cs, ) if err != nil { return nil, fmt.Errorf("could not create blockchain reactor: %w", err) @@ -300,9 +301,9 @@ func makeNode(config *cfg.Config, // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. // FIXME We need to update metrics here, since other reactors don't have access to them. if stateSync { - csMetrics.StateSyncing.Set(1) + nodeMetrics.cs.StateSyncing.Set(1) } else if blockSync { - csMetrics.BlockSyncing.Set(1) + nodeMetrics.cs.BlockSyncing.Set(1) } // Set up state sync reactor, and schedule a sync if requested. @@ -342,6 +343,7 @@ func makeNode(config *cfg.Config, stateStore, blockStore, config.StateSync.TempDir, + nodeMetrics.statesync, ) // add the channel descriptors to both the transports @@ -379,7 +381,7 @@ func makeNode(config *cfg.Config, if config.P2P.UseLegacy { // setup Transport and Switch sw = createSwitch( - config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, + config, transport, nodeMetrics.p2p, mpReactorShim, bcReactorForSwitch, stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, ) @@ -1035,20 +1037,37 @@ func defaultGenesisDocProviderFunc(config *cfg.Config) genesisDocProvider { } } -// metricsProvider returns a consensus, p2p and mempool Metrics. -type metricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempool.Metrics, *sm.Metrics) +type nodeMetrics struct { + cs *cs.Metrics + p2p *p2p.Metrics + mempool *mempool.Metrics + state *sm.Metrics + statesync *statesync.Metrics +} + +// metricsProvider returns consensus, p2p, mempool, state, statesync Metrics. +type metricsProvider func(chainID string) *nodeMetrics // defaultMetricsProvider returns Metrics build using Prometheus client library // if Prometheus is enabled. Otherwise, it returns no-op Metrics. func defaultMetricsProvider(config *cfg.InstrumentationConfig) metricsProvider { - return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempool.Metrics, *sm.Metrics) { + return func(chainID string) *nodeMetrics { if config.Prometheus { - return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), + return &nodeMetrics{ + cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), mempool.PrometheusMetrics(config.Namespace, "chain_id", chainID), - sm.PrometheusMetrics(config.Namespace, "chain_id", chainID) + sm.PrometheusMetrics(config.Namespace, "chain_id", chainID), + statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID), + } + } + return &nodeMetrics{ + cs.NopMetrics(), + p2p.NopMetrics(), + mempool.NopMetrics(), + sm.NopMetrics(), + statesync.NopMetrics(), } - return cs.NopMetrics(), p2p.NopMetrics(), mempool.NopMetrics(), sm.NopMetrics() } } diff --git a/rpc/client/mock/status_test.go b/rpc/client/mock/status_test.go index 3933c33c9..4f2f59f4c 100644 --- a/rpc/client/mock/status_test.go +++ b/rpc/client/mock/status_test.go @@ -20,12 +20,19 @@ func TestStatus(t *testing.T) { Call: mock.Call{ Response: &ctypes.ResultStatus{ SyncInfo: ctypes.SyncInfo{ - LatestBlockHash: bytes.HexBytes("block"), - LatestAppHash: bytes.HexBytes("app"), - LatestBlockHeight: 10, - MaxPeerBlockHeight: 20, - TotalSyncedTime: time.Second, - RemainingTime: time.Minute, + LatestBlockHash: bytes.HexBytes("block"), + LatestAppHash: bytes.HexBytes("app"), + LatestBlockHeight: 10, + MaxPeerBlockHeight: 20, + TotalSyncedTime: time.Second, + RemainingTime: time.Minute, + TotalSnapshots: 10, + ChunkProcessAvgTime: time.Duration(10), + SnapshotHeight: 10, + SnapshotChunksCount: 9, + SnapshotChunksTotal: 10, + BackFilledBlocks: 9, + BackFillBlocksTotal: 10, }, }}, } @@ -56,4 +63,12 @@ func TestStatus(t *testing.T) { assert.EqualValues(20, st.SyncInfo.MaxPeerBlockHeight) assert.EqualValues(time.Second, status.SyncInfo.TotalSyncedTime) assert.EqualValues(time.Minute, status.SyncInfo.RemainingTime) + + assert.EqualValues(10, st.SyncInfo.TotalSnapshots) + assert.EqualValues(time.Duration(10), st.SyncInfo.ChunkProcessAvgTime) + assert.EqualValues(10, st.SyncInfo.SnapshotHeight) + assert.EqualValues(9, status.SyncInfo.SnapshotChunksCount) + assert.EqualValues(10, status.SyncInfo.SnapshotChunksTotal) + assert.EqualValues(9, status.SyncInfo.BackFilledBlocks) + assert.EqualValues(10, status.SyncInfo.BackFillBlocksTotal) } diff --git a/rpc/core/env.go b/rpc/core/env.go index 091c8972b..9aaa499a2 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -11,6 +11,7 @@ import ( mempl "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/proxy" + "github.com/tendermint/tendermint/internal/statesync" tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -91,12 +92,13 @@ type Environment struct { PeerManager peerManager // objects - PubKey crypto.PubKey - GenDoc *types.GenesisDoc // cache the genesis structure - EventSinks []indexer.EventSink - EventBus *types.EventBus // thread safe - Mempool mempl.Mempool - BlockSyncReactor consensus.BlockSyncReactor + PubKey crypto.PubKey + GenDoc *types.GenesisDoc // cache the genesis structure + EventSinks []indexer.EventSink + EventBus *types.EventBus // thread safe + Mempool mempl.Mempool + BlockSyncReactor consensus.BlockSyncReactor + StateSyncMetricer statesync.Metricer Logger log.Logger diff --git a/rpc/core/status.go b/rpc/core/status.go index 815ab37f5..c5a412369 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -77,6 +77,16 @@ func (env *Environment) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, err ValidatorInfo: validatorInfo, } + if env.StateSyncMetricer != nil { + result.SyncInfo.TotalSnapshots = env.StateSyncMetricer.TotalSnapshots() + result.SyncInfo.ChunkProcessAvgTime = env.StateSyncMetricer.ChunkProcessAvgTime() + result.SyncInfo.SnapshotHeight = env.StateSyncMetricer.SnapshotHeight() + result.SyncInfo.SnapshotChunksCount = env.StateSyncMetricer.SnapshotChunksCount() + result.SyncInfo.SnapshotChunksTotal = env.StateSyncMetricer.SnapshotChunksTotal() + result.SyncInfo.BackFilledBlocks = env.StateSyncMetricer.BackFilledBlocks() + result.SyncInfo.BackFillBlocksTotal = env.StateSyncMetricer.BackFillBlocksTotal() + } + return result, nil } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index caa9b8732..ecb058312 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -100,6 +100,14 @@ type SyncInfo struct { TotalSyncedTime time.Duration `json:"total_synced_time"` RemainingTime time.Duration `json:"remaining_time"` + + TotalSnapshots int64 `json:"total_snapshots"` + ChunkProcessAvgTime time.Duration `json:"chunk_process_avg_time"` + SnapshotHeight int64 `json:"snapshot_height"` + SnapshotChunksCount int64 `json:"snapshot_chunks_count"` + SnapshotChunksTotal int64 `json:"snapshot_chunks_total"` + BackFilledBlocks int64 `json:"backfilled_blocks"` + BackFillBlocksTotal int64 `json:"backfill_blocks_total"` } // Info about the node's validator diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index d5a9ffafa..a32d44986 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -1396,6 +1396,27 @@ components: remaining_time: type: string example: "0" + total_snapshots: + type: string + example: "10" + chunk_process_avg_time: + type: string + example: "1000000000" + snapshot_height: + type: string + example: "1262196" + snapshot_chunks_count: + type: string + example: "10" + snapshot_chunks_total: + type: string + example: "100" + backfilled_blocks: + type: string + example: "10" + backfill_blocks_total: + type: string + example: "100" ValidatorInfo: type: object properties: