state sync: tune request timeout and chunkers (backport #6566) (#6581)

* state sync: tune request timeout and chunkers (#6566)

(cherry picked from commit 7d961b55b2)

# Conflicts:
#	CHANGELOG_PENDING.md
#	config/config.go
#	internal/statesync/reactor.go
#	internal/statesync/reactor_test.go
#	node/node.go
#	statesync/syncer.go

* fix build

* fix config

* fix config

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
Co-authored-by: Aleksandr Bezobchuk <aleks.bezobchuk@gmail.com>
This commit is contained in:
mergify[bot]
2021-06-15 15:10:16 -04:00
committed by GitHub
parent 358b1f23c0
commit be8c9833ca
9 changed files with 103 additions and 31 deletions

View File

@@ -6,6 +6,7 @@ import (
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
@@ -28,6 +29,7 @@ const (
type Reactor struct {
p2p.BaseReactor
cfg config.StateSyncConfig
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
tempDir string
@@ -39,12 +41,20 @@ type Reactor struct {
}
// NewReactor creates a new state sync reactor.
func NewReactor(conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDir string) *Reactor {
func NewReactor(
cfg config.StateSyncConfig,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
tempDir string,
) *Reactor {
r := &Reactor{
cfg: cfg,
conn: conn,
connQuery: connQuery,
}
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)
return r
}
@@ -252,7 +262,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.mtx.Unlock()
return sm.State{}, nil, errors.New("a state sync is already in progress")
}
r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
r.syncer = newSyncer(r.cfg, r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
r.mtx.Unlock()
hook := func() {

View File

@@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
p2pmocks "github.com/tendermint/tendermint/p2p/mocks"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
@@ -60,7 +61,8 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
}
// Start a reactor and send a ssproto.ChunkRequest, then wait for and check response
r := NewReactor(conn, nil, "")
cfg := config.DefaultStateSyncConfig()
r := NewReactor(*cfg, conn, nil, "")
err := r.Start()
require.NoError(t, err)
t.Cleanup(func() {
@@ -137,7 +139,8 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
}
// Start a reactor and send a SnapshotsRequestMessage, then wait for and check responses
r := NewReactor(conn, nil, "")
cfg := config.DefaultStateSyncConfig()
r := NewReactor(*cfg, conn, nil, "")
err := r.Start()
require.NoError(t, err)
t.Cleanup(func() {

View File

@@ -8,6 +8,7 @@ import (
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
@@ -18,12 +19,9 @@ import (
)
const (
// chunkFetchers is the number of concurrent chunk fetchers to run.
chunkFetchers = 4
// chunkTimeout is the timeout while waiting for the next chunk from the chunk queue.
chunkTimeout = 2 * time.Minute
// requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer.
chunkRequestTimeout = 10 * time.Second
// minimumDiscoveryTime is the lowest allowable time for a
// SyncAny discovery time.
minimumDiscoveryTime = 5 * time.Second
@@ -52,6 +50,7 @@ var (
// sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
// snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
type syncer struct {
cfg config.StateSyncConfig
logger log.Logger
stateProvider StateProvider
conn proxy.AppConnSnapshot
@@ -64,9 +63,17 @@ type syncer struct {
}
// newSyncer creates a new syncer.
func newSyncer(logger log.Logger, conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery,
stateProvider StateProvider, tempDir string) *syncer {
func newSyncer(
cfg config.StateSyncConfig,
logger log.Logger,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
stateProvider StateProvider,
tempDir string,
) *syncer {
return &syncer{
cfg: cfg,
logger: logger,
stateProvider: stateProvider,
conn: conn,
@@ -243,7 +250,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i := int32(0); i < chunkFetchers; i++ {
for i := int32(0); i < s.cfg.ChunkFetchers; i++ {
go s.fetchChunks(ctx, snapshot, chunks)
}
@@ -396,16 +403,21 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch
s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", index, "total", chunks.Size())
ticker := time.NewTicker(chunkRequestTimeout)
ticker := time.NewTicker(s.cfg.ChunkRequestTimeout)
defer ticker.Stop()
s.requestChunk(snapshot, index)
select {
case <-chunks.WaitFor(index):
case <-ticker.C:
s.requestChunk(snapshot, index)
case <-ctx.Done():
return
}
ticker.Stop()
}
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
@@ -31,7 +32,9 @@ func setupOfferSyncer(t *testing.T) (*syncer, *proxymocks.AppConnSnapshot) {
connSnapshot := &proxymocks.AppConnSnapshot{}
stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
cfg := config.DefaultStateSyncConfig()
syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
return syncer, connSnapshot
}
@@ -83,7 +86,8 @@ func TestSyncer_SyncAny(t *testing.T) {
connSnapshot := &proxymocks.AppConnSnapshot{}
connQuery := &proxymocks.AppConnQuery{}
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
cfg := config.DefaultStateSyncConfig()
syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
// Adding a chunk should error when no sync is in progress
_, err := syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}})
@@ -406,7 +410,9 @@ func TestSyncer_applyChunks_Results(t *testing.T) {
connSnapshot := &proxymocks.AppConnSnapshot{}
stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
cfg := config.DefaultStateSyncConfig()
syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
body := []byte{1, 2, 3}
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "")
@@ -457,7 +463,9 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
connSnapshot := &proxymocks.AppConnSnapshot{}
stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
cfg := config.DefaultStateSyncConfig()
syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "")
require.NoError(t, err)
@@ -520,7 +528,9 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
connSnapshot := &proxymocks.AppConnSnapshot{}
stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
cfg := config.DefaultStateSyncConfig()
syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
// Set up three peers across two snapshots, and ask for one of them to be banned.
// It should be banned from all snapshots.
@@ -633,7 +643,9 @@ func TestSyncer_verifyApp(t *testing.T) {
connQuery := &proxymocks.AppConnQuery{}
connSnapshot := &proxymocks.AppConnSnapshot{}
stateProvider := &mocks.StateProvider{}
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
cfg := config.DefaultStateSyncConfig()
syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
connQuery.On("InfoSync", proxy.RequestInfo).Return(tc.response, tc.err)
version, err := syncer.verifyApp(s)