diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 1e94eabe5..42497708e 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -115,10 +115,13 @@ func (d *Dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh // respond allows the underlying process which receives requests on the // requestCh to respond with the respective light block func (d *Dispatcher) Respond(lb *proto.LightBlock, peer types.NodeID) error { - fmt.Printf("trying to respond with light block for height %d from %v\n", lb.SignedHeader.Header.Height, peer) + if lb != nil { + fmt.Printf("trying to respond with light block for height %d from %v\n", lb.SignedHeader.Header.Height, peer) + } else { + fmt.Println("responded with empty block") + } d.mtx.Lock() defer d.mtx.Unlock() - fmt.Printf("responding with light block for height %d from %v\n", lb.SignedHeader.Header.Height, peer) // check that the response came from a request answerCh, ok := d.calls[peer] @@ -189,7 +192,7 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li return nil, provider.ErrLightBlockNotFound case errNoResponse: return nil, provider.ErrNoResponse - default: + default: // errDisconnected return nil, provider.ErrUnreliableProvider{Reason: err.Error()} } diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index 4072d9a28..a319dd952 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -23,6 +23,7 @@ var ( func TestDispatcherBasic(t *testing.T) { t.Cleanup(leaktest.Check(t)) + numPeers := 5 ch := make(chan p2p.Envelope, 100) closeCh := make(chan struct{}) @@ -31,20 +32,20 @@ func TestDispatcherBasic(t *testing.T) { d := NewDispatcher(ch, 1*time.Second) go handleRequests(t, d, ch, closeCh) - peers := createPeerSet(5) + peers := createPeerSet(numPeers) wg := sync.WaitGroup{} // make a bunch of async requests and require that the correct responses are // given - for i := 1; i < 10; i++ { + for i := 0; i < numPeers; i++ { wg.Add(1) go func(height int64) { defer wg.Done() - lb, err := d.LightBlock(context.Background(), height, peers[i]) + lb, err := d.LightBlock(context.Background(), height, peers[height - 1]) require.NoError(t, err) require.NotNil(t, lb) require.Equal(t, lb.Height, height) - }(int64(i)) + }(int64(i + 1)) } wg.Wait() @@ -71,21 +72,6 @@ func TestDispatcherReturnsNoBlock(t *testing.T) { require.Nil(t, err) } -func TestBlockProviderTimeOutWaitingOnLightBlock(t *testing.T) { - t.Cleanup(leaktest.Check(t)) - ch := make(chan p2p.Envelope, 100) - d := NewDispatcher(ch, 1*time.Second) - - closeCh := make(chan struct{}) - defer close(closeCh) - go handleRequests(t, d, ch, closeCh) - - provider := NewBlockProvider(peer, "my-chain", d) - lb, err := provider.LightBlock(context.Background(), 1) - require.NoError(t, err) - require.NotNil(t, lb) -} - func TestDispatcherTimeOutWaitingOnLightBlock(t *testing.T) { t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) @@ -101,6 +87,21 @@ func TestDispatcherTimeOutWaitingOnLightBlock(t *testing.T) { require.Nil(t, lb) } +func TestDispatcherTimeOutWaitingOnLightBlock2(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + ch := make(chan p2p.Envelope, 100) + d := NewDispatcher(ch, 10*time.Millisecond) + + ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancelFunc() + + lb, err := d.LightBlock(ctx, 1, peer) + + require.Error(t, err) + require.Equal(t, errNoResponse, err) + require.Nil(t, lb) +} + func TestDispatcherProviders(t *testing.T) { t.Cleanup(leaktest.Check(t)) @@ -127,6 +128,15 @@ func TestDispatcherProviders(t *testing.T) { } } +func TestDispatcherStopped(t *testing.T) { + ch := make(chan p2p.Envelope, 100) + d := NewDispatcher(ch, 5*time.Second) + d.Stop() + + _, err := d.LightBlock(context.Background(), 1, peer) + require.Error(t, err) +} + func TestPeerListBasic(t *testing.T) { t.Cleanup(leaktest.Check(t)) peerList := newPeerList() diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 8157d5934..75575388c 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -130,6 +130,8 @@ const ( type Reactor struct { service.BaseService + chainID string + initialHeight int64 cfg config.StateSyncConfig stateStore sm.Store blockStore *store.BlockStore @@ -143,14 +145,19 @@ type Reactor struct { paramsCh *p2p.Channel peerUpdates *p2p.PeerUpdates closeCh chan struct{} - peers *peerList - dispatcher *Dispatcher + + // Dispatcher is used to mutex light block requests and responses over multiple + // block providers used by the p2p state provider and in reverse sync. + dispatcher *Dispatcher + peers *peerList // These will only be set when a state sync is in progress. It is used to feed - // received snapshots and chunks into the sync. And to fetch light blocks and - // consensus params for verification and building of tendermint state. - mtx tmsync.RWMutex + // received snapshots and chunks into the syncer and manage incoming and outgoing + // providers. + mtx tmsync.RWMutex syncer *syncer + providers map[types.NodeID]*blockProvider + stateProvider StateProvider } // NewReactor returns a reference to a new state sync reactor, which implements @@ -158,6 +165,8 @@ type Reactor struct { // and querying, references to p2p Channels and a channel to listen for peer // updates on. Note, the reactor will close all p2p Channels when stopping. func NewReactor( + chainID string, + initialHeight int64, cfg config.StateSyncConfig, logger log.Logger, conn proxy.AppConnSnapshot, @@ -169,6 +178,7 @@ func NewReactor( tempDir string, ) *Reactor { r := &Reactor{ + chainID: chainID, cfg: cfg, conn: conn, connQuery: connQuery, @@ -183,6 +193,7 @@ func NewReactor( blockStore: blockStore, peers: newPeerList(), dispatcher: NewDispatcher(blockCh.Out, lightBlockResponseTimeout), + providers: make(map[types.NodeID]*blockProvider), } r.BaseService = *service.NewBaseService(logger, "StateSync", r) @@ -244,53 +255,21 @@ func (r *Reactor) Sync( chainID string, initialHeight int64, ) (sm.State, error) { + r.waitForEnoughPeers(ctx, 3) r.mtx.Lock() if r.syncer != nil { r.mtx.Unlock() return sm.State{}, errors.New("a state sync is already in progress") } - r.mtx.Unlock() - to := light.TrustOptions{ - Period: r.cfg.TrustPeriod, - Height: r.cfg.TrustHeight, - Hash: r.cfg.TrustHashBytes(), - } - spLogger := r.Logger.With("module", "stateprovider") + r.initStateProvider(ctx, chainID, initialHeight) - var ( - stateProvider StateProvider - err error - ) - if r.cfg.UseP2P { - // state provider needs at least two connected peers to initialize - spLogger.Info("Generating P2P state provider") - r.waitForEnoughPeers(ctx, 2) - peers := r.peers.All() - providers := make([]provider.Provider, len(peers)) - for idx, p := range peers { - providers[idx] = NewBlockProvider(p, chainID, r.dispatcher) - } - - stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh.Out, spLogger) - if err != nil { - return sm.State{}, err - } - spLogger.Info("Finished generating P2P state provider") - } else { - stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger) - if err != nil { - return sm.State{}, err - } - } - - r.mtx.Lock() r.syncer = newSyncer( r.cfg, r.Logger, r.conn, r.connQuery, - stateProvider, + r.stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir, @@ -298,8 +277,9 @@ func (r *Reactor) Sync( r.mtx.Unlock() defer func() { r.mtx.Lock() + // reset syncing objects at the close of Sync r.syncer = nil - r.dispatcher = nil + r.stateProvider = nil r.mtx.Unlock() }() @@ -311,7 +291,6 @@ func (r *Reactor) Sync( } } - r.Logger.Info("sync any starting") state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, requestSnapshotsHook) if err != nil { return sm.State{}, err @@ -393,6 +372,7 @@ func (r *Reactor) backfill( peer := r.peers.Pop(ctx) r.Logger.Debug("fetching next block", "height", height, "peer", peer) lb, err := r.dispatcher.LightBlock(ctxWithCancel, height, peer) + r.peers.Append(peer) if errors.Is(err, context.Canceled) { return } @@ -571,9 +551,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { ) return nil } - if msg.Height == 3 { - fmt.Println("received snapshot for height 3") - } + logger.Info("added snapshot", "height", msg.Height, "format", msg.Format) default: return fmt.Errorf("received unknown message: %T", msg) @@ -681,7 +659,17 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error { r.Logger.Error("failed to retrieve light block", "err", err, "height", msg.Height) return err } - r.Logger.Info("fetched light block", "height", lb.SignedHeader.Header.Height) + if lb == nil { + r.Logger.Info("returning nil light block", "height", msg.Height) + r.blockCh.Out <- p2p.Envelope{ + To: envelope.From, + Message: &ssproto.LightBlockResponse{ + LightBlock: nil, + }, + } + r.Logger.Info("sent light block response", "height", msg.Height) + return nil + } lbproto, err := lb.ToProto() if err != nil { @@ -701,10 +689,8 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error { case *ssproto.LightBlockResponse: r.Logger.Info("received light block response") - if r.dispatcher != nil { - if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil { - r.Logger.Error("error processing light block response", "err", err) - } + if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil { + r.Logger.Error("error processing light block response", "err", err) } default: @@ -738,19 +724,16 @@ func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error { defer r.mtx.RUnlock() r.Logger.Debug("received consensus params response", "height", msg.Height) - if r.syncer == nil { - r.Logger.Debug("received unexpected params response; no state sync in progress", "peer", envelope.From) - return nil - } - cp := types.ConsensusParamsFromProto(msg.ConsensusParams) - if sp, ok := r.syncer.stateProvider.(*stateProviderP2P); ok { + if sp, ok := r.stateProvider.(*stateProviderP2P); ok { r.Logger.Debug("passing along message") select { case sp.paramsRecvCh <- cp: default: } + } else { + r.Logger.Debug("received unexpected params response; using RPC state provider", "peer", envelope.From) } default: @@ -852,21 +835,33 @@ func (r *Reactor) processCh(ch *p2p.Channel, chName string) { // handle the PeerUpdate or if a panic is recovered. func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { r.Logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) - r.mtx.Lock() - defer r.mtx.Unlock() switch peerUpdate.Status { case p2p.PeerStatusUp: r.peers.Append(peerUpdate.NodeID) - if r.syncer != nil { - r.syncer.AddPeer(peerUpdate.NodeID) - } - case p2p.PeerStatusDown: r.peers.Remove(peerUpdate.NodeID) - if r.syncer != nil { - r.syncer.RemovePeer(peerUpdate.NodeID) - } + } + + r.mtx.Lock() + if r.syncer == nil { + r.mtx.Unlock() + return + } + defer r.mtx.Unlock() + + switch peerUpdate.Status { + case p2p.PeerStatusUp: + newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher) + r.providers[peerUpdate.NodeID] = newProvider + r.syncer.AddPeer(peerUpdate.NodeID) + // if sp, ok := r.stateProvider.(*stateProviderP2P); ok { + // sp.addProvider(newProvider) + // } + + case p2p.PeerStatusDown: + delete(r.providers, peerUpdate.NodeID) + r.syncer.RemovePeer(peerUpdate.NodeID) } r.Logger.Info("processed peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) } @@ -972,3 +967,35 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) { } } } + +func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initialHeight int64) error { + var err error + to := light.TrustOptions{ + Period: r.cfg.TrustPeriod, + Height: r.cfg.TrustHeight, + Hash: r.cfg.TrustHashBytes(), + } + spLogger := r.Logger.With("module", "stateprovider") + + if r.cfg.UseP2P { + spLogger.Info("Generating P2P state provider") + + peers := r.peers.All() + providers := make([]provider.Provider, len(peers)) + for idx, p := range peers { + providers[idx] = NewBlockProvider(p, chainID, r.dispatcher) + } + + r.stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh.Out, spLogger) + if err != nil { + return err + } + spLogger.Info("Finished generating P2P state provider") + } else { + r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 357be2dda..f7c913a1d 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -19,7 +19,7 @@ import ( "github.com/tendermint/tendermint/internal/statesync/mocks" "github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/light" + // "github.com/tendermint/tendermint/light" "github.com/tendermint/tendermint/light/provider" ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" @@ -142,6 +142,8 @@ func setup( cfg := config.DefaultStateSyncConfig() rts.reactor = NewReactor( + factory.DefaultTestChainID, + 1, *cfg, log.TestingLogger(), conn, @@ -442,19 +444,20 @@ func TestReactor_BlockProviders(t *testing.T) { } -func TestReactor_P2P_Provider(t *testing.T) { +func TestReactor_StateProviderP2P(t *testing.T) { rts := setup(t, nil, nil, nil, 2) - rts.peerUpdateCh <- p2p.PeerUpdate{ - NodeID: types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength)), - Status: p2p.PeerStatusUp, - } - rts.peerUpdateCh <- p2p.PeerUpdate{ - NodeID: types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength)), - Status: p2p.PeerStatusUp, - } - // make syncer non nil else test won't think we are state syncing rts.reactor.syncer = rts.syncer + peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength)) + peerB := types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength)) + rts.peerUpdateCh <- p2p.PeerUpdate{ + NodeID: peerA, + Status: p2p.PeerStatusUp, + } + rts.peerUpdateCh <- p2p.PeerUpdate{ + NodeID: peerB, + Status: p2p.PeerStatusUp, + } closeCh := make(chan struct{}) defer close(closeCh) @@ -463,42 +466,32 @@ func TestReactor_P2P_Provider(t *testing.T) { go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0) go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh) - // we now test the p2p state provider - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - peers := rts.reactor.peers.All() - require.Len(t, peers, 2) - lb, err := rts.reactor.dispatcher.LightBlock(ctx, 2, peers[0]) + rts.reactor.cfg.UseP2P = true + rts.reactor.cfg.TrustHeight = 1 + rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash()) + ctx := context.Background() + err := rts.reactor.initStateProvider(ctx, factory.DefaultTestChainID, 1) require.NoError(t, err) - to := light.TrustOptions{ - Period: 24 * time.Hour, - Height: lb.Height, - Hash: lb.Hash(), - } + rts.reactor.syncer.stateProvider = rts.reactor.stateProvider - providers := make([]provider.Provider, len(peers)) - for idx, peer := range peers { - providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher) - } - - p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, providers, - to, rts.reactor.paramsCh.Out, log.TestingLogger()) - require.NoError(t, err) - // set the state provider else the test won't think we are state syncing - rts.reactor.syncer = rts.syncer - rts.syncer.stateProvider = p2pStateProvider - - appHash, err := p2pStateProvider.AppHash(ctx, 5) + appHash, err := rts.reactor.stateProvider.AppHash(ctx, 5) require.NoError(t, err) require.Len(t, appHash, 32) - state, err := p2pStateProvider.State(ctx, 5) + state, err := rts.reactor.stateProvider.State(ctx, 5) require.NoError(t, err) require.Equal(t, appHash, state.AppHash) + require.Equal(t, types.DefaultConsensusParams(), &state.ConsensusParams) - commit, err := p2pStateProvider.Commit(ctx, 5) + commit, err := rts.reactor.stateProvider.Commit(ctx, 5) require.NoError(t, err) require.Equal(t, commit.BlockID, state.LastBlockID) + + added, err := rts.reactor.syncer.AddSnapshot(peerA, &snapshot{ + Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1}, + }) + require.NoError(t, err) + require.True(t, added) } func TestReactor_Backfill(t *testing.T) { diff --git a/internal/statesync/snapshots.go b/internal/statesync/snapshots.go index 9058304a9..309d2484d 100644 --- a/internal/statesync/snapshots.go +++ b/internal/statesync/snapshots.go @@ -1,13 +1,11 @@ package statesync import ( - "context" "crypto/sha256" "fmt" "math/rand" "sort" "strings" - "time" tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/types" @@ -43,8 +41,6 @@ func (s *snapshot) Key() snapshotKey { // snapshotPool discovers and aggregates snapshots across peers. type snapshotPool struct { - stateProvider StateProvider - tmsync.Mutex snapshots map[snapshotKey]*snapshot snapshotPeers map[snapshotKey]map[types.NodeID]types.NodeID @@ -61,9 +57,8 @@ type snapshotPool struct { } // newSnapshotPool creates a new snapshot pool. The state source is used for -func newSnapshotPool(stateProvider StateProvider) *snapshotPool { +func newSnapshotPool() *snapshotPool { return &snapshotPool{ - stateProvider: stateProvider, snapshots: make(map[snapshotKey]*snapshot), snapshotPeers: make(map[snapshotKey]map[types.NodeID]types.NodeID), formatIndex: make(map[uint32]map[snapshotKey]bool), @@ -80,14 +75,6 @@ func newSnapshotPool(stateProvider StateProvider) *snapshotPool { // snapshot height is verified using the light client, and the expected app hash // is set for the snapshot. func (p *snapshotPool) Add(peerID types.NodeID, snapshot *snapshot) (bool, error) { - ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) - defer cancel() - - appHash, err := p.stateProvider.AppHash(ctx, snapshot.Height) - if err != nil { - return false, fmt.Errorf("failed to get app hash: %w", err) - } - snapshot.trustedAppHash = appHash key := snapshot.Key() p.Lock() diff --git a/internal/statesync/snapshots_test.go b/internal/statesync/snapshots_test.go index 6f27269f7..08cb08269 100644 --- a/internal/statesync/snapshots_test.go +++ b/internal/statesync/snapshots_test.go @@ -3,10 +3,8 @@ package statesync import ( "testing" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/internal/statesync/mocks" "github.com/tendermint/tendermint/types" ) @@ -39,13 +37,10 @@ func TestSnapshot_Key(t *testing.T) { } func TestSnapshotPool_Add(t *testing.T) { - stateProvider := &mocks.StateProvider{} - stateProvider.On("AppHash", mock.Anything, uint64(1)).Return([]byte("app_hash"), nil) - peerID := types.NodeID("aa") // Adding to the pool should work - pool := newSnapshotPool(stateProvider) + pool := newSnapshotPool() added, err := pool.Add(peerID, &snapshot{ Height: 1, Format: 1, @@ -66,18 +61,12 @@ func TestSnapshotPool_Add(t *testing.T) { require.NoError(t, err) require.False(t, added) - // The pool should have populated the snapshot with the trusted app hash snapshot := pool.Best() require.NotNil(t, snapshot) - require.Equal(t, []byte("app_hash"), snapshot.trustedAppHash) - - stateProvider.AssertExpectations(t) } func TestSnapshotPool_GetPeer(t *testing.T) { - stateProvider := &mocks.StateProvider{} - stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - pool := newSnapshotPool(stateProvider) + pool := newSnapshotPool() s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}} @@ -112,9 +101,7 @@ func TestSnapshotPool_GetPeer(t *testing.T) { } func TestSnapshotPool_GetPeers(t *testing.T) { - stateProvider := &mocks.StateProvider{} - stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - pool := newSnapshotPool(stateProvider) + pool := newSnapshotPool() s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}} @@ -137,9 +124,7 @@ func TestSnapshotPool_GetPeers(t *testing.T) { } func TestSnapshotPool_Ranked_Best(t *testing.T) { - stateProvider := &mocks.StateProvider{} - stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - pool := newSnapshotPool(stateProvider) + pool := newSnapshotPool() // snapshots in expected order (best to worst). Highest height wins, then highest format. // Snapshots with different chunk hashes are considered different, and the most peers is @@ -182,9 +167,7 @@ func TestSnapshotPool_Ranked_Best(t *testing.T) { } func TestSnapshotPool_Reject(t *testing.T) { - stateProvider := &mocks.StateProvider{} - stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - pool := newSnapshotPool(stateProvider) + pool := newSnapshotPool() peerID := types.NodeID("aa") @@ -212,9 +195,7 @@ func TestSnapshotPool_Reject(t *testing.T) { } func TestSnapshotPool_RejectFormat(t *testing.T) { - stateProvider := &mocks.StateProvider{} - stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - pool := newSnapshotPool(stateProvider) + pool := newSnapshotPool() peerID := types.NodeID("aa") @@ -243,9 +224,7 @@ func TestSnapshotPool_RejectFormat(t *testing.T) { } func TestSnapshotPool_RejectPeer(t *testing.T) { - stateProvider := &mocks.StateProvider{} - stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - pool := newSnapshotPool(stateProvider) + pool := newSnapshotPool() peerAID := types.NodeID("aa") peerBID := types.NodeID("bb") @@ -285,9 +264,7 @@ func TestSnapshotPool_RejectPeer(t *testing.T) { } func TestSnapshotPool_RemovePeer(t *testing.T) { - stateProvider := &mocks.StateProvider{} - stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - pool := newSnapshotPool(stateProvider) + pool := newSnapshotPool() peerAID := types.NodeID("aa") peerBID := types.NodeID("bb") diff --git a/internal/statesync/stateprovider.go b/internal/statesync/stateprovider.go index 8c6233f13..babaf482f 100644 --- a/internal/statesync/stateprovider.go +++ b/internal/statesync/stateprovider.go @@ -333,12 +333,16 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State, return state, nil } +// addProvider dynamically adds a peer as a new witness. A limit of 6 providers is kept as a +// heuristic. Too many overburdens the network and too little compromises the second layer of security. func (s *stateProviderP2P) addProvider(p lightprovider.Provider) { if len(s.lc.Witnesses()) < 6 { s.lc.AddProvider(p) } } +// consensusParams sends out a request for consensus params blocking until one is returned. +// If it fails to get a valid set of consensus params from any of the providers it returns an error. func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) { providers := s.lc.Witnesses() for _, provider := range providers { diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 24a50bfdb..2a1f45e63 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -84,7 +84,7 @@ func newSyncer( stateProvider: stateProvider, conn: conn, connQuery: connQuery, - snapshots: newSnapshotPool(stateProvider), + snapshots: newSnapshotPool(), snapshotCh: snapshotCh, chunkCh: chunkCh, tempDir: tempDir, @@ -118,6 +118,19 @@ func (s *syncer) AddChunk(chunk *chunk) (bool, error) { // AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen // snapshot was accepted and added. func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, error) { + ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) + defer cancel() + + // Fetch the app hash corresponding to the snapshot + // TODO: We do all this computation for each and every snapshot we receive (even after + // attempting to fetch the chunks and restore state). We should only do this when a snapshot + // is selected and the actual sync begins + appHash, err := s.stateProvider.AppHash(ctx, snapshot.Height) + if err != nil { + return false, fmt.Errorf("failed to get app hash: %w", err) + } + snapshot.trustedAppHash = appHash + added, err := s.snapshots.Add(peerID, snapshot) if err != nil { return false, err diff --git a/light/client.go b/light/client.go index 52bbdf981..651756333 100644 --- a/light/client.go +++ b/light/client.go @@ -989,7 +989,7 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool) c.providerMutex.Lock() defer c.providerMutex.Unlock() - if len(c.witnesses) <= 1 { + if len(c.witnesses) < 1 { return nil, ErrNoWitnesses } diff --git a/node/node.go b/node/node.go index b0b35aa31..cb5b07d56 100644 --- a/node/node.go +++ b/node/node.go @@ -333,6 +333,8 @@ func makeNode(config *cfg.Config, } stateSyncReactor = statesync.NewReactor( + genDoc.ChainID, + genDoc.InitialHeight, *config.StateSync, stateSyncReactorShim.Logger, proxyApp.Snapshot(), diff --git a/test/e2e/networks/simple.toml b/test/e2e/networks/simple.toml index e98514188..2efb30c09 100644 --- a/test/e2e/networks/simple.toml +++ b/test/e2e/networks/simple.toml @@ -5,8 +5,6 @@ snapshot_interval = 3 [node.validator03] snapshot_interval = 3 [node.validator04] -snapshot_interval =3 -[node.validator05] state_sync = true start_at = 5 persistent_peers = ["validator01", "validator02"] \ No newline at end of file diff --git a/test/e2e/runner/main.go b/test/e2e/runner/main.go index cb3d7d6bc..5b1051c01 100644 --- a/test/e2e/runner/main.go +++ b/test/e2e/runner/main.go @@ -168,12 +168,21 @@ func NewCLI() *CLI { }, }) + cli.root.AddCommand(&cobra.Command{ + Use: "pause", + Short: "Pauses the Docker testnet", + RunE: func(cmd *cobra.Command, args []string) error { + logger.Info("Pausing testnet") + return execCompose(cli.testnet.Dir, "pause") + }, + }) + cli.root.AddCommand(&cobra.Command{ Use: "resume", Short: "Resumes the Docker testnet", RunE: func(cmd *cobra.Command, args []string) error { logger.Info("Resuming testnet") - return execCompose(cli.testnet.Dir, "up") + return execCompose(cli.testnet.Dir, "unpause") }, })