mirror of
https://github.com/tendermint/tendermint.git
synced 2025-12-23 06:15:19 +00:00
statesync: improve stateprovider handling in the syncer (backport) (#6881)
This commit is contained in:
@@ -1,12 +1,10 @@
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
@@ -42,8 +40,6 @@ func (s *snapshot) Key() snapshotKey {
|
||||
|
||||
// snapshotPool discovers and aggregates snapshots across peers.
|
||||
type snapshotPool struct {
|
||||
stateProvider StateProvider
|
||||
|
||||
tmsync.Mutex
|
||||
snapshots map[snapshotKey]*snapshot
|
||||
snapshotPeers map[snapshotKey]map[p2p.ID]p2p.Peer
|
||||
@@ -60,9 +56,8 @@ type snapshotPool struct {
|
||||
}
|
||||
|
||||
// newSnapshotPool creates a new snapshot pool. The state source is used for
|
||||
func newSnapshotPool(stateProvider StateProvider) *snapshotPool {
|
||||
func newSnapshotPool() *snapshotPool {
|
||||
return &snapshotPool{
|
||||
stateProvider: stateProvider,
|
||||
snapshots: make(map[snapshotKey]*snapshot),
|
||||
snapshotPeers: make(map[snapshotKey]map[p2p.ID]p2p.Peer),
|
||||
formatIndex: make(map[uint32]map[snapshotKey]bool),
|
||||
@@ -78,14 +73,6 @@ func newSnapshotPool(stateProvider StateProvider) *snapshotPool {
|
||||
// returns true if this was a new, non-blacklisted snapshot. The snapshot height is verified using
|
||||
// the light client, and the expected app hash is set for the snapshot.
|
||||
func (p *snapshotPool) Add(peer p2p.Peer, snapshot *snapshot) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
appHash, err := p.stateProvider.AppHash(ctx, snapshot.Height)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
snapshot.trustedAppHash = appHash
|
||||
key := snapshot.Key()
|
||||
|
||||
p.Lock()
|
||||
|
||||
@@ -4,12 +4,10 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
p2pmocks "github.com/tendermint/tendermint/p2p/mocks"
|
||||
"github.com/tendermint/tendermint/statesync/mocks"
|
||||
)
|
||||
|
||||
func TestSnapshot_Key(t *testing.T) {
|
||||
@@ -41,14 +39,11 @@ func TestSnapshot_Key(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_Add(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, uint64(1)).Return([]byte("app_hash"), nil)
|
||||
|
||||
peer := &p2pmocks.Peer{}
|
||||
peer.On("ID").Return(p2p.ID("id"))
|
||||
|
||||
// Adding to the pool should work
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
added, err := pool.Add(peer, &snapshot{
|
||||
Height: 1,
|
||||
Format: 1,
|
||||
@@ -73,15 +68,10 @@ func TestSnapshotPool_Add(t *testing.T) {
|
||||
// The pool should have populated the snapshot with the trusted app hash
|
||||
snapshot := pool.Best()
|
||||
require.NotNil(t, snapshot)
|
||||
assert.Equal(t, []byte("app_hash"), snapshot.trustedAppHash)
|
||||
|
||||
stateProvider.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestSnapshotPool_GetPeer(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}}
|
||||
peerA := &p2pmocks.Peer{}
|
||||
@@ -115,9 +105,7 @@ func TestSnapshotPool_GetPeer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_GetPeers(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}}
|
||||
peerA := &p2pmocks.Peer{}
|
||||
@@ -139,9 +127,7 @@ func TestSnapshotPool_GetPeers(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_Ranked_Best(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
// snapshots in expected order (best to worst). Highest height wins, then highest format.
|
||||
// Snapshots with different chunk hashes are considered different, and the most peers is
|
||||
@@ -184,9 +170,7 @@ func TestSnapshotPool_Ranked_Best(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_Reject(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
peer := &p2pmocks.Peer{}
|
||||
peer.On("ID").Return(p2p.ID("id"))
|
||||
|
||||
@@ -214,9 +198,7 @@ func TestSnapshotPool_Reject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_RejectFormat(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
peer := &p2pmocks.Peer{}
|
||||
peer.On("ID").Return(p2p.ID("id"))
|
||||
|
||||
@@ -245,9 +227,7 @@ func TestSnapshotPool_RejectFormat(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_RejectPeer(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerA := &p2pmocks.Peer{}
|
||||
peerA.On("ID").Return(p2p.ID("a"))
|
||||
@@ -287,9 +267,7 @@ func TestSnapshotPool_RejectPeer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_RemovePeer(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerA := &p2pmocks.Peer{}
|
||||
peerA.On("ID").Return(p2p.ID("a"))
|
||||
|
||||
@@ -106,10 +106,6 @@ func (s *lightClientStateProvider) AppHash(ctx context.Context, height uint64) (
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = s.lc.VerifyLightBlockAtHeight(ctx, int64(height), time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return header.AppHash, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
"github.com/tendermint/tendermint/light"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
@@ -78,7 +79,7 @@ func newSyncer(
|
||||
stateProvider: stateProvider,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
snapshots: newSnapshotPool(stateProvider),
|
||||
snapshots: newSnapshotPool(),
|
||||
tempDir: tempDir,
|
||||
chunkFetchers: cfg.ChunkFetchers,
|
||||
retryTimeout: cfg.ChunkRequestTimeout,
|
||||
@@ -247,30 +248,51 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
|
||||
s.mtx.Unlock()
|
||||
}()
|
||||
|
||||
hctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
appHash, err := s.stateProvider.AppHash(hctx, snapshot.Height)
|
||||
if err != nil {
|
||||
s.logger.Info("failed to fetch and verify app hash", "err", err)
|
||||
if err == light.ErrNoWitnesses {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
return sm.State{}, nil, errRejectSnapshot
|
||||
}
|
||||
snapshot.trustedAppHash = appHash
|
||||
|
||||
// Offer snapshot to ABCI app.
|
||||
err := s.offerSnapshot(snapshot)
|
||||
err = s.offerSnapshot(snapshot)
|
||||
if err != nil {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
|
||||
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
fetchCtx, cancel := context.WithCancel(context.TODO())
|
||||
defer cancel()
|
||||
for i := int32(0); i < s.chunkFetchers; i++ {
|
||||
go s.fetchChunks(ctx, snapshot, chunks)
|
||||
go s.fetchChunks(fetchCtx, snapshot, chunks)
|
||||
}
|
||||
|
||||
pctx, pcancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
pctx, pcancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||
defer pcancel()
|
||||
|
||||
// Optimistically build new state, so we don't discover any light client failures at the end.
|
||||
state, err := s.stateProvider.State(pctx, snapshot.Height)
|
||||
if err != nil {
|
||||
return sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
|
||||
s.logger.Info("failed to fetch and verify tendermint state", "err", err)
|
||||
if err == light.ErrNoWitnesses {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
return sm.State{}, nil, errRejectSnapshot
|
||||
}
|
||||
commit, err := s.stateProvider.Commit(pctx, snapshot.Height)
|
||||
if err != nil {
|
||||
return sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)
|
||||
s.logger.Info("failed to fetch and verify commit", "err", err)
|
||||
if err == light.ErrNoWitnesses {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
return sm.State{}, nil, errRejectSnapshot
|
||||
}
|
||||
|
||||
// Restore snapshot
|
||||
|
||||
Reference in New Issue
Block a user