diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 42497708e..daaa53632 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -66,6 +66,7 @@ func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.No } }() + fmt.Println("awaiting for a response") // wait for a response, cancel or timeout select { case resp := <-callCh: @@ -187,10 +188,12 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li return nil, provider.ErrLightBlockNotFound } case context.DeadlineExceeded, context.Canceled: + fmt.Println("context canceled") return nil, err case errPeerAlreadyBusy: return nil, provider.ErrLightBlockNotFound case errNoResponse: + fmt.Println("no response") return nil, provider.ErrNoResponse default: // errDisconnected return nil, provider.ErrUnreliableProvider{Reason: err.Error()} diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index a319dd952..af44883d3 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -41,7 +41,7 @@ func TestDispatcherBasic(t *testing.T) { wg.Add(1) go func(height int64) { defer wg.Done() - lb, err := d.LightBlock(context.Background(), height, peers[height - 1]) + lb, err := d.LightBlock(context.Background(), height, peers[height-1]) require.NoError(t, err) require.NotNil(t, lb) require.Equal(t, lb.Height, height) diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 75575388c..c15b242f2 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -63,7 +63,7 @@ var ( MsgType: new(ssproto.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(LightBlockChannel), - Priority: 1, + Priority: 11, SendQueueCapacity: 10, RecvMessageCapacity: lightBlockMsgSize, RecvBufferCapacity: 128, @@ -114,7 +114,7 @@ const ( // lightBlockResponseTimeout is how long the dispatcher waits for a peer to // return a light block - lightBlockResponseTimeout = 30 * time.Second + lightBlockResponseTimeout = 10 * time.Second // consensusParamsResponseTimeout is the time the p2p state provider waits // before performing a secondary call @@ -130,11 +130,11 @@ const ( type Reactor struct { service.BaseService - chainID string + chainID string initialHeight int64 - cfg config.StateSyncConfig - stateStore sm.Store - blockStore *store.BlockStore + cfg config.StateSyncConfig + stateStore sm.Store + blockStore *store.BlockStore conn proxy.AppConnSnapshot connQuery proxy.AppConnQuery @@ -154,9 +154,9 @@ type Reactor struct { // These will only be set when a state sync is in progress. It is used to feed // received snapshots and chunks into the syncer and manage incoming and outgoing // providers. - mtx tmsync.RWMutex - syncer *syncer - providers map[types.NodeID]*blockProvider + mtx tmsync.RWMutex + syncer *syncer + providers map[types.NodeID]*blockProvider stateProvider StateProvider } @@ -178,7 +178,7 @@ func NewReactor( tempDir string, ) *Reactor { r := &Reactor{ - chainID: chainID, + chainID: chainID, cfg: cfg, conn: conn, connQuery: connQuery, @@ -242,6 +242,7 @@ func (r *Reactor) OnStop() { <-r.snapshotCh.Done() <-r.chunkCh.Done() <-r.blockCh.Done() + <-r.paramsCh.Done() <-r.peerUpdates.Done() } @@ -250,19 +251,17 @@ func (r *Reactor) OnStop() { // store and persist the commit at that height so that either consensus or // blocksync can commence. It will then proceed to backfill the necessary amount // of historical blocks before participating in consensus -func (r *Reactor) Sync( - ctx context.Context, - chainID string, - initialHeight int64, -) (sm.State, error) { - r.waitForEnoughPeers(ctx, 3) +func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { + r.waitForEnoughPeers(ctx, 2) r.mtx.Lock() if r.syncer != nil { r.mtx.Unlock() return sm.State{}, errors.New("a state sync is already in progress") } - r.initStateProvider(ctx, chainID, initialHeight) + if err := r.initStateProvider(ctx, r.chainID, r.initialHeight); err != nil { + return sm.State{}, err + } r.syncer = newSyncer( r.cfg, @@ -979,7 +978,7 @@ func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initial if r.cfg.UseP2P { spLogger.Info("Generating P2P state provider") - + peers := r.peers.All() providers := make([]provider.Provider, len(peers)) for idx, p := range peers { diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index f7c913a1d..dca0013d0 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -19,6 +19,8 @@ import ( "github.com/tendermint/tendermint/internal/statesync/mocks" "github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/proxy" + // "github.com/tendermint/tendermint/light" "github.com/tendermint/tendermint/light/provider" ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" @@ -159,7 +161,8 @@ func setup( ) // override the dispatcher with one with a shorter timeout - rts.reactor.dispatcher = NewDispatcher(rts.blockChannel.Out, 1*time.Second) + rts.reactor.dispatcher = NewDispatcher(rts.blockChannel.Out, + 100*time.Millisecond) rts.syncer = newSyncer( *cfg, @@ -183,6 +186,57 @@ func setup( return rts } +func TestReactor_Sync(t *testing.T) { + var snapshotHeight int64 = 7 + rts := setup(t, nil, nil, nil, 2) + chain := buildLightBlockChain(t, 1, 10, time.Now()) + // app accepts any snapshot + rts.conn.On("OfferSnapshotSync", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")). + Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil) + + // app accepts every chunk + rts.conn.On("ApplySnapshotChunkSync", ctx, mock.AnythingOfType("types.RequestApplySnapshotChunk")). + Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) + + // app query returns valid state app hash + rts.connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(&abci.ResponseInfo{ + AppVersion: 9, + LastBlockHeight: snapshotHeight, + LastBlockAppHash: chain[snapshotHeight+1].AppHash, + }, nil) + + // store accepts state and validator sets + rts.stateStore.On("Bootstrap", mock.AnythingOfType("state.State")).Return(nil) + rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"), + mock.AnythingOfType("*types.ValidatorSet")).Return(nil) + + closeCh := make(chan struct{}) + defer close(closeCh) + go handleLightBlockRequests(t, chain, rts.blockOutCh, + rts.blockInCh, closeCh, 0) + go graduallyAddPeers(rts.peerUpdateCh, closeCh, 1*time.Second) + go handleSnapshotRequests(t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{ + { + Height: uint64(snapshotHeight), + Format: 1, + Chunks: 1, + }, + }) + + go handleChunkRequests(t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc")) + + go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh) + + // update the config to use the p2p provider + rts.reactor.cfg.UseP2P = true + rts.reactor.cfg.TrustHeight = 1 + rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash()) + rts.reactor.cfg.DiscoveryTime = 2 * time.Second + + _, err := rts.reactor.Sync(context.Background()) + require.NoError(t, err) +} + func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) { rts := setup(t, nil, nil, nil, 2) @@ -688,3 +742,82 @@ func mockLB(t *testing.T, height int64, time time.Time, lastBlockID types.BlockI ValidatorSet: currentVals, } } + +func graduallyAddPeers( + peerUpdateCh chan p2p.PeerUpdate, + closeCh chan struct{}, + interval time.Duration, +) { + ticker := time.NewTicker(interval) + for { + select { + case <-ticker.C: + fmt.Println("adding new peer") + peerUpdateCh <- p2p.PeerUpdate{ + NodeID: factory.RandomNodeID(), + Status: p2p.PeerStatusUp, + } + case <-closeCh: + return + } + } +} + +func handleSnapshotRequests( + t *testing.T, + receivingCh chan p2p.Envelope, + sendingCh chan p2p.Envelope, + closeCh chan struct{}, + snapshots []snapshot, +) { + for { + select { + case envelope := <-receivingCh: + _, ok := envelope.Message.(*ssproto.SnapshotsRequest) + require.True(t, ok) + for _, snapshot := range snapshots { + sendingCh <- p2p.Envelope{ + From: envelope.To, + Message: &ssproto.SnapshotsResponse{ + Height: snapshot.Height, + Format: snapshot.Format, + Chunks: snapshot.Chunks, + Hash: snapshot.Hash, + Metadata: snapshot.Metadata, + }, + } + } + case <-closeCh: + return + } + } +} + +func handleChunkRequests( + t *testing.T, + receivingCh chan p2p.Envelope, + sendingCh chan p2p.Envelope, + closeCh chan struct{}, + chunk []byte, +) { + for { + select { + case envelope := <-receivingCh: + msg, ok := envelope.Message.(*ssproto.ChunkRequest) + require.True(t, ok) + sendingCh <- p2p.Envelope{ + From: envelope.To, + Message: &ssproto.ChunkResponse{ + Height: msg.Height, + Format: msg.Format, + Index: msg.Index, + Chunk: chunk, + Missing: false, + }, + } + + case <-closeCh: + return + } + } +} diff --git a/internal/statesync/stateprovider.go b/internal/statesync/stateprovider.go index babaf482f..1e23088f4 100644 --- a/internal/statesync/stateprovider.go +++ b/internal/statesync/stateprovider.go @@ -87,6 +87,7 @@ func NewRPCStateProvider( // AppHash implements StateProvider. func (s *stateProviderRPC) AppHash(ctx context.Context, height uint64) ([]byte, error) { + fmt.Println("requesting app hash") s.Lock() defer s.Unlock() @@ -333,7 +334,7 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State, return state, nil } -// addProvider dynamically adds a peer as a new witness. A limit of 6 providers is kept as a +// addProvider dynamically adds a peer as a new witness. A limit of 6 providers is kept as a // heuristic. Too many overburdens the network and too little compromises the second layer of security. func (s *stateProviderP2P) addProvider(p lightprovider.Provider) { if len(s.lc.Witnesses()) < 6 { @@ -350,9 +351,11 @@ func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (t if !ok { panic("expected p2p state provider to use p2p block providers") } + + // extract the nodeID of the provider peer, err := types.NewNodeID(p.String()) if err != nil { - return types.ConsensusParams{}, fmt.Errorf("invalid provider node id: %w", err) + return types.ConsensusParams{}, fmt.Errorf("invalid provider node id: %w, provider: %s", err, p.String()) } select { diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 2a1f45e63..e93e49990 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -40,7 +40,7 @@ var ( errRejectSender = errors.New("snapshot sender was rejected") // errVerifyFailed is returned by Sync() when app hash or last height // verification fails. - errVerifyFailed = errors.New("verification failed") + errVerifyFailed = errors.New("verification with app failed") // errTimeout is returned by Sync() when we've waited too long to receive a chunk. errTimeout = errors.New("timed out waiting for chunk") // errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled. diff --git a/internal/test/factory/factory_test.go b/internal/test/factory/factory_test.go index 25f234508..07a3ef8b3 100644 --- a/internal/test/factory/factory_test.go +++ b/internal/test/factory/factory_test.go @@ -12,3 +12,7 @@ func TestMakeHeader(t *testing.T) { _, err := MakeHeader(&types.Header{}) assert.NoError(t, err) } + +func TestRandomNodeID(t *testing.T) { + assert.NotPanics(t, func() { RandomNodeID() }) +} diff --git a/internal/test/factory/p2p.go b/internal/test/factory/p2p.go new file mode 100644 index 000000000..36ea24ba1 --- /dev/null +++ b/internal/test/factory/p2p.go @@ -0,0 +1,25 @@ +package factory + +import ( + "encoding/hex" + "strings" + + "github.com/tendermint/tendermint/libs/rand" + "github.com/tendermint/tendermint/types" +) + +func NodeID(str string) types.NodeID { + id, err := types.NewNodeID(strings.Repeat(str, 2*types.NodeIDByteLength)) + if err != nil { + panic(err) + } + return id +} + +func RandomNodeID() types.NodeID { + id, err := types.NewNodeID(hex.EncodeToString(rand.Bytes( types.NodeIDByteLength))) + if err != nil { + panic(err) + } + return id +} diff --git a/node/node.go b/node/node.go index cb5b07d56..249101a11 100644 --- a/node/node.go +++ b/node/node.go @@ -682,8 +682,8 @@ func (n *nodeImpl) OnStart() error { // FIXME: We shouldn't allow state sync to silently error out without // bubbling up the error and gracefully shutting down the rest of the node go func() { - n.Logger.Info("staring state sync") - state, err := n.stateSyncReactor.Sync(context.TODO(), state.ChainID, state.InitialHeight) + n.Logger.Info("starting state sync") + state, err := n.stateSyncReactor.Sync(context.TODO()) if err != nil { n.Logger.Error("state sync failed", "err", err) return diff --git a/test/e2e/networks/simple.toml b/test/e2e/networks/simple.toml index 2efb30c09..e0d906bde 100644 --- a/test/e2e/networks/simple.toml +++ b/test/e2e/networks/simple.toml @@ -1,10 +1,8 @@ [node.validator01] -snapshot_interval = 2 +snapshot_interval = 3 [node.validator02] snapshot_interval = 3 [node.validator03] -snapshot_interval = 3 -[node.validator04] state_sync = true start_at = 5 persistent_peers = ["validator01", "validator02"] \ No newline at end of file