mirror of
https://github.com/tendermint/tendermint.git
synced 2026-04-07 09:19:06 +00:00
* Makefile: always pull image in proto-gen-docker. (#5953) The `proto-gen-docker` target didn't pull an updated Docker image, and would use a local image if present which could be outdated and produce wrong results. * test: fix TestPEXReactorRunning data race (#5955) Fixes #5941. Not entirely sure that this will fix the problem (couldn't reproduce), but in any case this is an artifact of a hack in the P2P transport refactor to make it work with the legacy P2P stack, and will be removed when the refactor is done anyway. * test/fuzz: move fuzz tests into this repo (#5918) Co-authored-by: Emmanuel T Odeke <emmanuel@orijtech.com> Closes #5907 - add init-corpus to blockchain reactor - remove validator-set FromBytes test now that we have proto, we don't need to test it! bye amino - simplify mempool test do we want to test remote ABCI app? - do not recreate mux on every crash in jsonrpc test - update p2p pex reactor test - remove p2p/listener test the API has changed + I did not understand what it's tested anyway - update secretconnection test - add readme and makefile - list inputs in readme - add nightly workflow - remove blockchain fuzz test EncodeMsg / DecodeMsg no longer exist * docker: dont login when in PR (#5961) * docker: release Linux/ARM64 image (#5925) Co-authored-by: Marko <marbar3778@yahoo.com> * p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well. * libs/log: format []byte as hexidecimal string (uppercased) (#5960) Closes: #5806 Co-authored-by: Lanie Hei <heixx011@umn.edu> * docs: log level docs (#5945) ## Description add section on configuring log levels Closes: #XXX * .github: fix fuzz-nightly job (#5965) outputs is a property of the job, not an individual step. * e2e: add control over the log level of nodes (#5958) * mempool: fix reactor tests (#5967) ## Description Update the faux router to either drop channel errors or handle them based on an argument. This prevents deadlocks in tests where we try to send an error on the mempool channel but there is no reader. Closes: #5956 * p2p: improve peerStore prototype (#5954) This improves the `peerStore` prototype by e.g.: * Using a database with Protobuf for persistence, but also keeping full peer set in memory for performance. * Simplifying the API, by taking/returning struct copies for safety, and removing errors for in-memory operations. * Caching the ranked peer set, as a temporary solution until a better data structure is implemented. * Adding `PeerManagerOptions.MaxPeers` and pruning the peer store (based on rank) when it's full. * Rewriting `PeerAddress` to be independent of `url.URL`, normalizing it and tightening semantics. * p2p: simplify PeerManager upgrade logic (#5962) Follow-up from #5947, branched off of #5954. This simplifies the upgrade logic by adding explicit eviction requests, which can also be useful for other use-cases (e.g. if we need to ban a peer that's misbehaving). Changes: * Add `evict` map which queues up peers to explicitly evict. * `upgrading` now only tracks peers that we're upgrading via dialing (`DialNext` → `Dialed`/`DialFailed`). * `Dialed` will unmark `upgrading`, and queue `evict` if still beyond capacity. * `Accepted` will pick a random lower-scored peer to upgrade to, if appropriate, and doesn't care about `upgrading` (the dial will fail later, since it's already connected). * `EvictNext` will return a peer scheduled in `evict` if any, otherwise if beyond capacity just evict the lowest-scored peer. This limits all of the `upgrading` logic to `DialNext`, `Dialed`, and `DialFailed`, making it much simplier, and it should generally do the right thing in all cases I can think of. * p2p: add PeerManager.Advertise() (#5957) Adds a naïve `PeerManager.Advertise()` method that the new PEX reactor can use to fetch addresses to advertise, as well as some other `FIXME`s on address advertisement. * blockchain v0: fix waitgroup data race (#5970) ## Description Fixes the data race in usage of `WaitGroup`. Specifically, the case where we invoke `Wait` _before_ the first delta `Add` call when the current waitgroup counter is zero. See https://golang.org/pkg/sync/#WaitGroup.Add. Still not sure how this manifests itself in a test since the reactor has to be stopped virtually immediately after being started (I think?). Regardless, this is the appropriate fix. closes: #5968 * tests: fix `make test` (#5966) ## Description - bump deadlock dep to master - fixes `make test` since we now use `deadlock.Once` Closes: #XXX * terminate go-fuzz gracefully (w/ SIGINT) (#5973) and preserve exit code. ``` 2021/01/26 03:34:49 workers: 2, corpus: 4 (8m28s ago), crashers: 0, restarts: 1/9976, execs: 11013732 (21596/sec), cover: 121, uptime: 8m30s make: *** [fuzz-mempool] Terminated Makefile:5: recipe for target 'fuzz-mempool' failed Error: Process completed with exit code 124. ``` https://github.com/tendermint/tendermint/runs/1766661614 `continue-on-error` should make GH ignore any error codes. * p2p: add prototype PEX reactor for new stack (#5971) This adds a prototype PEX reactor for the new P2P stack. * proto/p2p: rename PEX messages and fields (#5974) Fixes #5899 by renaming a bunch of P2P Protobuf entities (while maintaining wire compatibility): * `Message` to `PexMessage` (as it's only used for PEX messages). * `PexAddrs` to `PexResponse`. * `PexResponse.Addrs` to `PexResponse.Addresses`. * `NetAddress` to `PexAddress` (as it's only used by PEX). * p2p: resolve PEX addresses in PEX reactor (#5980) This changes the new prototype PEX reactor to resolve peer address URLs into IP/port PEX addresses itself. Branched off of #5974. I've spent some time thinking about address handling in the P2P stack. We currently use `PeerAddress` URLs everywhere, except for two places: when dialing a peer, and when exchanging addresses via PEX. We had two options: 1. Resolve addresses to endpoints inside `PeerManager`. This would introduce a lot of added complexity: we would have to track connection statistics per endpoint, have goroutines that asynchronously resolve and refresh these endpoints, deal with resolve scheduling before dialing (which is trickier than it sounds since it involves multiple goroutines in the peer manager and router and messes with peer rating order), handle IP address visibility issues, and so on. 2. Resolve addresses to endpoints (IP/port) only where they're used: when dialing, and in PEX. Everywhere else we use URLs. I went with 2, because this significantly simplifies the handling of hostname resolution, and because I really think the PEX reactor should migrate to exchanging URLs instead of IP/port numbers anyway -- this allows operators to use DNS names for validators (and can easily migrate them to new IPs and/or load balance requests), and also allows different protocols (e.g. QUIC and `MemoryTransport`). Happy to discuss this. * test/p2p: close transports to avoid goroutine leak failures (#5982) * mempool: fix TestReactorNoBroadcastToSender (#5984) ## Description Looks like I missed a test in the original PR when fixing the tests. Closes: #5956 * mempool: fix mempool tests timeout (#5988) * p2p: use stopCtx when dialing peers in Router (#5983) This ensures we don't leak dial goroutines when shutting down the router. * docs: fix typo in state sync example (#5989) Co-authored-by: Erik Grinaker <erik@interchain.berlin> Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com> Co-authored-by: Marko <marbar3778@yahoo.com> Co-authored-by: odidev <odidev@puresoftware.com> Co-authored-by: Lanie Hei <heixx011@umn.edu> Co-authored-by: Callum Waters <cmwaters19@gmail.com> Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com> Co-authored-by: Sergey <52304443+c29r3@users.noreply.github.com>
473 lines
15 KiB
Go
473 lines
15 KiB
Go
package statesync
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
tmsync "github.com/tendermint/tendermint/libs/sync"
|
|
"github.com/tendermint/tendermint/p2p"
|
|
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
|
"github.com/tendermint/tendermint/proxy"
|
|
sm "github.com/tendermint/tendermint/state"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
const (
|
|
// chunkFetchers is the number of concurrent chunk fetchers to run.
|
|
chunkFetchers = 4
|
|
// chunkTimeout is the timeout while waiting for the next chunk from the chunk queue.
|
|
chunkTimeout = 2 * time.Minute
|
|
// requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer.
|
|
chunkRequestTimeout = 10 * time.Second
|
|
)
|
|
|
|
var (
|
|
// errAbort is returned by Sync() when snapshot restoration is aborted.
|
|
errAbort = errors.New("state sync aborted")
|
|
// errRetrySnapshot is returned by Sync() when the snapshot should be retried.
|
|
errRetrySnapshot = errors.New("retry snapshot")
|
|
// errRejectSnapshot is returned by Sync() when the snapshot is rejected.
|
|
errRejectSnapshot = errors.New("snapshot was rejected")
|
|
// errRejectFormat is returned by Sync() when the snapshot format is rejected.
|
|
errRejectFormat = errors.New("snapshot format was rejected")
|
|
// errRejectSender is returned by Sync() when the snapshot sender is rejected.
|
|
errRejectSender = errors.New("snapshot sender was rejected")
|
|
// errVerifyFailed is returned by Sync() when app hash or last height verification fails.
|
|
errVerifyFailed = errors.New("verification failed")
|
|
// errTimeout is returned by Sync() when we've waited too long to receive a chunk.
|
|
errTimeout = errors.New("timed out waiting for chunk")
|
|
// errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
|
|
errNoSnapshots = errors.New("no suitable snapshots found")
|
|
)
|
|
|
|
// syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
|
|
// sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
|
|
// snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
|
|
type syncer struct {
|
|
logger log.Logger
|
|
stateProvider StateProvider
|
|
conn proxy.AppConnSnapshot
|
|
connQuery proxy.AppConnQuery
|
|
snapshots *snapshotPool
|
|
snapshotCh chan<- p2p.Envelope
|
|
chunkCh chan<- p2p.Envelope
|
|
tempDir string
|
|
|
|
mtx tmsync.RWMutex
|
|
chunks *chunkQueue
|
|
}
|
|
|
|
// newSyncer creates a new syncer.
|
|
func newSyncer(
|
|
logger log.Logger,
|
|
conn proxy.AppConnSnapshot,
|
|
connQuery proxy.AppConnQuery,
|
|
stateProvider StateProvider,
|
|
snapshotCh, chunkCh chan<- p2p.Envelope,
|
|
tempDir string,
|
|
) *syncer {
|
|
return &syncer{
|
|
logger: logger,
|
|
stateProvider: stateProvider,
|
|
conn: conn,
|
|
connQuery: connQuery,
|
|
snapshots: newSnapshotPool(stateProvider),
|
|
snapshotCh: snapshotCh,
|
|
chunkCh: chunkCh,
|
|
tempDir: tempDir,
|
|
}
|
|
}
|
|
|
|
// AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
|
|
// been added to the queue, or an error if there's no sync in progress.
|
|
func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
|
|
s.mtx.RLock()
|
|
defer s.mtx.RUnlock()
|
|
if s.chunks == nil {
|
|
return false, errors.New("no state sync in progress")
|
|
}
|
|
added, err := s.chunks.Add(chunk)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if added {
|
|
s.logger.Debug("Added chunk to queue", "height", chunk.Height, "format", chunk.Format,
|
|
"chunk", chunk.Index)
|
|
} else {
|
|
s.logger.Debug("Ignoring duplicate chunk in queue", "height", chunk.Height, "format", chunk.Format,
|
|
"chunk", chunk.Index)
|
|
}
|
|
return added, nil
|
|
}
|
|
|
|
// AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen
|
|
// snapshot was accepted and added.
|
|
func (s *syncer) AddSnapshot(peerID p2p.NodeID, snapshot *snapshot) (bool, error) {
|
|
added, err := s.snapshots.Add(peerID, snapshot)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if added {
|
|
s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format,
|
|
"hash", snapshot.Hash)
|
|
}
|
|
return added, nil
|
|
}
|
|
|
|
// AddPeer adds a peer to the pool. For now we just keep it simple and send a
|
|
// single request to discover snapshots, later we may want to do retries and stuff.
|
|
func (s *syncer) AddPeer(peerID p2p.NodeID) {
|
|
s.logger.Debug("Requesting snapshots from peer", "peer", peerID)
|
|
s.snapshotCh <- p2p.Envelope{
|
|
To: peerID,
|
|
Message: &ssproto.SnapshotsRequest{},
|
|
}
|
|
}
|
|
|
|
// RemovePeer removes a peer from the pool.
|
|
func (s *syncer) RemovePeer(peerID p2p.NodeID) {
|
|
s.logger.Debug("Removing peer from sync", "peer", peerID)
|
|
s.snapshots.RemovePeer(peerID)
|
|
}
|
|
|
|
// SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
|
|
// snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
|
|
// which the caller must use to bootstrap the node.
|
|
func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, error) {
|
|
if discoveryTime > 0 {
|
|
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
|
|
time.Sleep(discoveryTime)
|
|
}
|
|
|
|
// The app may ask us to retry a snapshot restoration, in which case we need to reuse
|
|
// the snapshot and chunk queue from the previous loop iteration.
|
|
var (
|
|
snapshot *snapshot
|
|
chunks *chunkQueue
|
|
err error
|
|
)
|
|
for {
|
|
// If not nil, we're going to retry restoration of the same snapshot.
|
|
if snapshot == nil {
|
|
snapshot = s.snapshots.Best()
|
|
chunks = nil
|
|
}
|
|
if snapshot == nil {
|
|
if discoveryTime == 0 {
|
|
return sm.State{}, nil, errNoSnapshots
|
|
}
|
|
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
|
|
time.Sleep(discoveryTime)
|
|
continue
|
|
}
|
|
if chunks == nil {
|
|
chunks, err = newChunkQueue(snapshot, s.tempDir)
|
|
if err != nil {
|
|
return sm.State{}, nil, fmt.Errorf("failed to create chunk queue: %w", err)
|
|
}
|
|
defer chunks.Close() // in case we forget to close it elsewhere
|
|
}
|
|
|
|
newState, commit, err := s.Sync(snapshot, chunks)
|
|
switch {
|
|
case err == nil:
|
|
return newState, commit, nil
|
|
|
|
case errors.Is(err, errAbort):
|
|
return sm.State{}, nil, err
|
|
|
|
case errors.Is(err, errRetrySnapshot):
|
|
chunks.RetryAll()
|
|
s.logger.Info("Retrying snapshot", "height", snapshot.Height, "format", snapshot.Format,
|
|
"hash", snapshot.Hash)
|
|
continue
|
|
|
|
case errors.Is(err, errTimeout):
|
|
s.snapshots.Reject(snapshot)
|
|
s.logger.Error("Timed out waiting for snapshot chunks, rejected snapshot",
|
|
"height", snapshot.Height, "format", snapshot.Format, "hash", snapshot.Hash)
|
|
|
|
case errors.Is(err, errRejectSnapshot):
|
|
s.snapshots.Reject(snapshot)
|
|
s.logger.Info("Snapshot rejected", "height", snapshot.Height, "format", snapshot.Format,
|
|
"hash", snapshot.Hash)
|
|
|
|
case errors.Is(err, errRejectFormat):
|
|
s.snapshots.RejectFormat(snapshot.Format)
|
|
s.logger.Info("Snapshot format rejected", "format", snapshot.Format)
|
|
|
|
case errors.Is(err, errRejectSender):
|
|
s.logger.Info("Snapshot senders rejected", "height", snapshot.Height, "format", snapshot.Format,
|
|
"hash", snapshot.Hash)
|
|
for _, peer := range s.snapshots.GetPeers(snapshot) {
|
|
s.snapshots.RejectPeer(peer)
|
|
s.logger.Info("Snapshot sender rejected", "peer", peer)
|
|
}
|
|
|
|
default:
|
|
return sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
|
|
}
|
|
|
|
// Discard snapshot and chunks for next iteration
|
|
err = chunks.Close()
|
|
if err != nil {
|
|
s.logger.Error("Failed to clean up chunk queue", "err", err)
|
|
}
|
|
snapshot = nil
|
|
chunks = nil
|
|
}
|
|
}
|
|
|
|
// Sync executes a sync for a specific snapshot, returning the latest state and block commit which
|
|
// the caller must use to bootstrap the node.
|
|
func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) {
|
|
s.mtx.Lock()
|
|
if s.chunks != nil {
|
|
s.mtx.Unlock()
|
|
return sm.State{}, nil, errors.New("a state sync is already in progress")
|
|
}
|
|
s.chunks = chunks
|
|
s.mtx.Unlock()
|
|
defer func() {
|
|
s.mtx.Lock()
|
|
s.chunks = nil
|
|
s.mtx.Unlock()
|
|
}()
|
|
|
|
// Offer snapshot to ABCI app.
|
|
err := s.offerSnapshot(snapshot)
|
|
if err != nil {
|
|
return sm.State{}, nil, err
|
|
}
|
|
|
|
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
for i := int32(0); i < chunkFetchers; i++ {
|
|
go s.fetchChunks(ctx, snapshot, chunks)
|
|
}
|
|
|
|
pctx, pcancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer pcancel()
|
|
|
|
// Optimistically build new state, so we don't discover any light client failures at the end.
|
|
state, err := s.stateProvider.State(pctx, snapshot.Height)
|
|
if err != nil {
|
|
return sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
|
|
}
|
|
commit, err := s.stateProvider.Commit(pctx, snapshot.Height)
|
|
if err != nil {
|
|
return sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)
|
|
}
|
|
|
|
// Restore snapshot
|
|
err = s.applyChunks(chunks)
|
|
if err != nil {
|
|
return sm.State{}, nil, err
|
|
}
|
|
|
|
// Verify app and update app version
|
|
appVersion, err := s.verifyApp(snapshot)
|
|
if err != nil {
|
|
return sm.State{}, nil, err
|
|
}
|
|
state.Version.Consensus.App = appVersion
|
|
|
|
// Done! 🎉
|
|
s.logger.Info("Snapshot restored", "height", snapshot.Height, "format", snapshot.Format,
|
|
"hash", snapshot.Hash)
|
|
|
|
return state, commit, nil
|
|
}
|
|
|
|
// offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
|
|
// response, or nil if the snapshot was accepted.
|
|
func (s *syncer) offerSnapshot(snapshot *snapshot) error {
|
|
s.logger.Info("Offering snapshot to ABCI app", "height", snapshot.Height,
|
|
"format", snapshot.Format, "hash", snapshot.Hash)
|
|
resp, err := s.conn.OfferSnapshotSync(context.Background(), abci.RequestOfferSnapshot{
|
|
Snapshot: &abci.Snapshot{
|
|
Height: snapshot.Height,
|
|
Format: snapshot.Format,
|
|
Chunks: snapshot.Chunks,
|
|
Hash: snapshot.Hash,
|
|
Metadata: snapshot.Metadata,
|
|
},
|
|
AppHash: snapshot.trustedAppHash,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to offer snapshot: %w", err)
|
|
}
|
|
switch resp.Result {
|
|
case abci.ResponseOfferSnapshot_ACCEPT:
|
|
s.logger.Info("Snapshot accepted, restoring", "height", snapshot.Height,
|
|
"format", snapshot.Format, "hash", snapshot.Hash)
|
|
return nil
|
|
case abci.ResponseOfferSnapshot_ABORT:
|
|
return errAbort
|
|
case abci.ResponseOfferSnapshot_REJECT:
|
|
return errRejectSnapshot
|
|
case abci.ResponseOfferSnapshot_REJECT_FORMAT:
|
|
return errRejectFormat
|
|
case abci.ResponseOfferSnapshot_REJECT_SENDER:
|
|
return errRejectSender
|
|
default:
|
|
return fmt.Errorf("unknown ResponseOfferSnapshot result %v", resp.Result)
|
|
}
|
|
}
|
|
|
|
// applyChunks applies chunks to the app. It returns various errors depending on the app's
|
|
// response, or nil once the snapshot is fully restored.
|
|
func (s *syncer) applyChunks(chunks *chunkQueue) error {
|
|
for {
|
|
chunk, err := chunks.Next()
|
|
if err == errDone {
|
|
return nil
|
|
} else if err != nil {
|
|
return fmt.Errorf("failed to fetch chunk: %w", err)
|
|
}
|
|
|
|
resp, err := s.conn.ApplySnapshotChunkSync(context.Background(), abci.RequestApplySnapshotChunk{
|
|
Index: chunk.Index,
|
|
Chunk: chunk.Chunk,
|
|
Sender: string(chunk.Sender),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to apply chunk %v: %w", chunk.Index, err)
|
|
}
|
|
s.logger.Info("Applied snapshot chunk to ABCI app", "height", chunk.Height,
|
|
"format", chunk.Format, "chunk", chunk.Index, "total", chunks.Size())
|
|
|
|
// Discard and refetch any chunks as requested by the app
|
|
for _, index := range resp.RefetchChunks {
|
|
err := chunks.Discard(index)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to discard chunk %v: %w", index, err)
|
|
}
|
|
}
|
|
|
|
// Reject any senders as requested by the app
|
|
for _, sender := range resp.RejectSenders {
|
|
if sender != "" {
|
|
peerID := p2p.NodeID(sender)
|
|
s.snapshots.RejectPeer(peerID)
|
|
|
|
if err := chunks.DiscardSender(peerID); err != nil {
|
|
return fmt.Errorf("failed to reject sender: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
switch resp.Result {
|
|
case abci.ResponseApplySnapshotChunk_ACCEPT:
|
|
case abci.ResponseApplySnapshotChunk_ABORT:
|
|
return errAbort
|
|
case abci.ResponseApplySnapshotChunk_RETRY:
|
|
chunks.Retry(chunk.Index)
|
|
case abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT:
|
|
return errRetrySnapshot
|
|
case abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT:
|
|
return errRejectSnapshot
|
|
default:
|
|
return fmt.Errorf("unknown ResponseApplySnapshotChunk result %v", resp.Result)
|
|
}
|
|
}
|
|
}
|
|
|
|
// fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
|
|
// will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
|
|
func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) {
|
|
for {
|
|
index, err := chunks.Allocate()
|
|
if err == errDone {
|
|
// Keep checking until the context is cancelled (restore is done), in case any
|
|
// chunks need to be refetched.
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
time.Sleep(2 * time.Second)
|
|
continue
|
|
}
|
|
if err != nil {
|
|
s.logger.Error("Failed to allocate chunk from queue", "err", err)
|
|
return
|
|
}
|
|
s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
|
|
"format", snapshot.Format, "chunk", index, "total", chunks.Size())
|
|
|
|
ticker := time.NewTicker(chunkRequestTimeout)
|
|
defer ticker.Stop()
|
|
s.requestChunk(snapshot, index)
|
|
select {
|
|
case <-chunks.WaitFor(index):
|
|
case <-ticker.C:
|
|
s.requestChunk(snapshot, index)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
ticker.Stop()
|
|
}
|
|
}
|
|
|
|
// requestChunk requests a chunk from a peer.
|
|
func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
|
|
peer := s.snapshots.GetPeer(snapshot)
|
|
if peer == "" {
|
|
s.logger.Error("No valid peers found for snapshot", "height", snapshot.Height,
|
|
"format", snapshot.Format, "hash", snapshot.Hash)
|
|
return
|
|
}
|
|
|
|
s.logger.Debug(
|
|
"Requesting snapshot chunk",
|
|
"height", snapshot.Height,
|
|
"format", snapshot.Format,
|
|
"chunk", chunk,
|
|
"peer", peer,
|
|
)
|
|
|
|
s.chunkCh <- p2p.Envelope{
|
|
To: peer,
|
|
Message: &ssproto.ChunkRequest{
|
|
Height: snapshot.Height,
|
|
Format: snapshot.Format,
|
|
Index: chunk,
|
|
},
|
|
}
|
|
}
|
|
|
|
// verifyApp verifies the sync, checking the app hash and last block height. It returns the
|
|
// app version, which should be returned as part of the initial state.
|
|
func (s *syncer) verifyApp(snapshot *snapshot) (uint64, error) {
|
|
resp, err := s.connQuery.InfoSync(context.Background(), proxy.RequestInfo)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to query ABCI app for appHash: %w", err)
|
|
}
|
|
|
|
if !bytes.Equal(snapshot.trustedAppHash, resp.LastBlockAppHash) {
|
|
s.logger.Error("appHash verification failed",
|
|
"expected", snapshot.trustedAppHash,
|
|
"actual", resp.LastBlockAppHash)
|
|
return 0, errVerifyFailed
|
|
}
|
|
|
|
if uint64(resp.LastBlockHeight) != snapshot.Height {
|
|
s.logger.Error(
|
|
"ABCI app reported unexpected last block height",
|
|
"expected", snapshot.Height,
|
|
"actual", resp.LastBlockHeight,
|
|
)
|
|
return 0, errVerifyFailed
|
|
}
|
|
|
|
s.logger.Info("Verified ABCI app", "height", snapshot.Height, "appHash", snapshot.trustedAppHash)
|
|
return resp.AppVersion, nil
|
|
}
|