mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-03 03:35:19 +00:00
add state sync reactor (#4705)
Fixes #828. Adds state sync, as outlined in [ADR-053](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-053-state-sync-prototype.md). See related PRs in Cosmos SDK (https://github.com/cosmos/cosmos-sdk/pull/5803) and Gaia (https://github.com/cosmos/gaia/pull/327). This is split out of the previous PR #4645, and branched off of the ABCI interface in #4704. * Adds a new P2P reactor which exchanges snapshots with peers, and bootstraps an empty local node from remote snapshots when requested. * Adds a new configuration section `[statesync]` that enables state sync and configures the light client. Also enables `statesync:info` logging by default. * Integrates state sync into node startup. Does not support the v2 blockchain reactor, since it needs some reorganization to defer startup.
This commit is contained in:
321
statesync/chunks.go
Normal file
321
statesync/chunks.go
Normal file
@@ -0,0 +1,321 @@
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
// errDone is returned by chunkQueue.Next() when all chunks have been returned.
|
||||
var errDone = errors.New("chunk queue has completed")
|
||||
|
||||
// chunk contains data for a chunk.
|
||||
type chunk struct {
|
||||
Height uint64
|
||||
Format uint32
|
||||
Index uint32
|
||||
Chunk []byte
|
||||
Sender p2p.ID
|
||||
}
|
||||
|
||||
// chunkQueue manages chunks for a state sync process, ordering them if requested. It acts as an
|
||||
// iterator over all chunks, but callers can request chunks to be retried, optionally after
|
||||
// refetching.
|
||||
type chunkQueue struct {
|
||||
sync.Mutex
|
||||
snapshot *snapshot // if this is nil, the queue has been closed
|
||||
dir string // temp dir for on-disk chunk storage
|
||||
chunkFiles map[uint32]string // path to temporary chunk file
|
||||
chunkSenders map[uint32]p2p.ID // the peer who sent the given chunk
|
||||
chunkAllocated map[uint32]bool // chunks that have been allocated via Allocate()
|
||||
chunkReturned map[uint32]bool // chunks returned via Next()
|
||||
waiters map[uint32][]chan<- uint32 // signals WaitFor() waiters about chunk arrival
|
||||
}
|
||||
|
||||
// newChunkQueue creates a new chunk queue for a snapshot, using a temp dir for storage.
|
||||
// Callers must call Close() when done.
|
||||
func newChunkQueue(snapshot *snapshot, tempDir string) (*chunkQueue, error) {
|
||||
dir, err := ioutil.TempDir(tempDir, "tm-statesync")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create temp dir for state sync chunks: %w", err)
|
||||
}
|
||||
if snapshot.Chunks == 0 {
|
||||
return nil, errors.New("snapshot has no chunks")
|
||||
}
|
||||
return &chunkQueue{
|
||||
snapshot: snapshot,
|
||||
dir: dir,
|
||||
chunkFiles: make(map[uint32]string, snapshot.Chunks),
|
||||
chunkSenders: make(map[uint32]p2p.ID, snapshot.Chunks),
|
||||
chunkAllocated: make(map[uint32]bool, snapshot.Chunks),
|
||||
chunkReturned: make(map[uint32]bool, snapshot.Chunks),
|
||||
waiters: make(map[uint32][]chan<- uint32),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Add adds a chunk to the queue. It ignores chunks that already exist, returning false.
|
||||
func (q *chunkQueue) Add(chunk *chunk) (bool, error) {
|
||||
if chunk == nil || chunk.Chunk == nil {
|
||||
return false, errors.New("cannot add nil chunk")
|
||||
}
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
if q.snapshot == nil {
|
||||
return false, nil // queue is closed
|
||||
}
|
||||
if chunk.Height != q.snapshot.Height {
|
||||
return false, fmt.Errorf("invalid chunk height %v, expected %v", chunk.Height, q.snapshot.Height)
|
||||
}
|
||||
if chunk.Format != q.snapshot.Format {
|
||||
return false, fmt.Errorf("invalid chunk format %v, expected %v", chunk.Format, q.snapshot.Format)
|
||||
}
|
||||
if chunk.Index >= q.snapshot.Chunks {
|
||||
return false, fmt.Errorf("received unexpected chunk %v", chunk.Index)
|
||||
}
|
||||
if q.chunkFiles[chunk.Index] != "" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
path := filepath.Join(q.dir, strconv.FormatUint(uint64(chunk.Index), 10))
|
||||
err := ioutil.WriteFile(path, chunk.Chunk, 0644)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to save chunk %v to file %v: %w", chunk.Index, path, err)
|
||||
}
|
||||
q.chunkFiles[chunk.Index] = path
|
||||
q.chunkSenders[chunk.Index] = chunk.Sender
|
||||
|
||||
// Signal any waiters that the chunk has arrived.
|
||||
for _, waiter := range q.waiters[chunk.Index] {
|
||||
waiter <- chunk.Index
|
||||
close(waiter)
|
||||
}
|
||||
delete(q.waiters, chunk.Index)
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Allocate allocates a chunk to the caller, making it responsible for fetching it. Returns
|
||||
// errDone once no chunks are left or the queue is closed.
|
||||
func (q *chunkQueue) Allocate() (uint32, error) {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
if q.snapshot == nil {
|
||||
return 0, errDone
|
||||
}
|
||||
if uint32(len(q.chunkAllocated)) >= q.snapshot.Chunks {
|
||||
return 0, errDone
|
||||
}
|
||||
for i := uint32(0); i < q.snapshot.Chunks; i++ {
|
||||
if !q.chunkAllocated[i] {
|
||||
q.chunkAllocated[i] = true
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
return 0, errDone
|
||||
}
|
||||
|
||||
// Close closes the chunk queue, cleaning up all temporary files.
|
||||
func (q *chunkQueue) Close() error {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
if q.snapshot == nil {
|
||||
return nil
|
||||
}
|
||||
for _, waiters := range q.waiters {
|
||||
for _, waiter := range waiters {
|
||||
close(waiter)
|
||||
}
|
||||
}
|
||||
q.waiters = nil
|
||||
q.snapshot = nil
|
||||
err := os.RemoveAll(q.dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to clean up state sync tempdir %v: %w", q.dir, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Discard discards a chunk. It will be removed from the queue, available for allocation, and can
|
||||
// be added and returned via Next() again. If the chunk is not already in the queue this does
|
||||
// nothing, to avoid it being allocated to multiple fetchers.
|
||||
func (q *chunkQueue) Discard(index uint32) error {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
return q.discard(index)
|
||||
}
|
||||
|
||||
// discard discards a chunk, scheduling it for refetching. The caller must hold the mutex lock.
|
||||
func (q *chunkQueue) discard(index uint32) error {
|
||||
if q.snapshot == nil {
|
||||
return nil
|
||||
}
|
||||
path := q.chunkFiles[index]
|
||||
if path == "" {
|
||||
return nil
|
||||
}
|
||||
err := os.Remove(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to remove chunk %v: %w", index, err)
|
||||
}
|
||||
delete(q.chunkFiles, index)
|
||||
delete(q.chunkReturned, index)
|
||||
delete(q.chunkAllocated, index)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DiscardSender discards all *unreturned* chunks from a given sender. If the caller wants to
|
||||
// discard already returned chunks, this can be done via Discard().
|
||||
func (q *chunkQueue) DiscardSender(peerID p2p.ID) error {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
for index, sender := range q.chunkSenders {
|
||||
if sender == peerID && !q.chunkReturned[index] {
|
||||
err := q.discard(index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(q.chunkSenders, index)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSender returns the sender of the chunk with the given index, or empty if not found.
|
||||
func (q *chunkQueue) GetSender(index uint32) p2p.ID {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
return q.chunkSenders[index]
|
||||
}
|
||||
|
||||
// Has checks whether a chunk exists in the queue.
|
||||
func (q *chunkQueue) Has(index uint32) bool {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
return q.chunkFiles[index] != ""
|
||||
}
|
||||
|
||||
// load loads a chunk from disk, or nil if the chunk is not in the queue. The caller must hold the
|
||||
// mutex lock.
|
||||
func (q *chunkQueue) load(index uint32) (*chunk, error) {
|
||||
path, ok := q.chunkFiles[index]
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
body, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load chunk %v: %w", index, err)
|
||||
}
|
||||
return &chunk{
|
||||
Height: q.snapshot.Height,
|
||||
Format: q.snapshot.Format,
|
||||
Index: index,
|
||||
Chunk: body,
|
||||
Sender: q.chunkSenders[index],
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Next returns the next chunk from the queue, or errDone if all chunks have been returned. It
|
||||
// blocks until the chunk is available. Concurrent Next() calls may return the same chunk.
|
||||
func (q *chunkQueue) Next() (*chunk, error) {
|
||||
q.Lock()
|
||||
var chunk *chunk
|
||||
index, err := q.nextUp()
|
||||
if err == nil {
|
||||
chunk, err = q.load(index)
|
||||
if err == nil {
|
||||
q.chunkReturned[index] = true
|
||||
}
|
||||
}
|
||||
q.Unlock()
|
||||
if chunk != nil || err != nil {
|
||||
return chunk, err
|
||||
}
|
||||
|
||||
select {
|
||||
case _, ok := <-q.WaitFor(index):
|
||||
if !ok {
|
||||
return nil, errDone // queue closed
|
||||
}
|
||||
case <-time.After(chunkTimeout):
|
||||
return nil, errTimeout
|
||||
}
|
||||
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
chunk, err = q.load(index)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q.chunkReturned[index] = true
|
||||
return chunk, nil
|
||||
}
|
||||
|
||||
// nextUp returns the next chunk to be returned, or errDone if all chunks have been returned. The
|
||||
// caller must hold the mutex lock.
|
||||
func (q *chunkQueue) nextUp() (uint32, error) {
|
||||
if q.snapshot == nil {
|
||||
return 0, errDone
|
||||
}
|
||||
for i := uint32(0); i < q.snapshot.Chunks; i++ {
|
||||
if !q.chunkReturned[i] {
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
return 0, errDone
|
||||
}
|
||||
|
||||
// Retry schedules a chunk to be retried, without refetching it.
|
||||
func (q *chunkQueue) Retry(index uint32) {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
delete(q.chunkReturned, index)
|
||||
}
|
||||
|
||||
// RetryAll schedules all chunks to be retried, without refetching them.
|
||||
func (q *chunkQueue) RetryAll() {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
q.chunkReturned = make(map[uint32]bool)
|
||||
}
|
||||
|
||||
// Size returns the total number of chunks for the snapshot and queue, or 0 when closed.
|
||||
func (q *chunkQueue) Size() uint32 {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
if q.snapshot == nil {
|
||||
return 0
|
||||
}
|
||||
return q.snapshot.Chunks
|
||||
}
|
||||
|
||||
// WaitFor returns a channel that receives a chunk index when it arrives in the queue, or
|
||||
// immediately if it has already arrived. The channel is closed without a value if the queue is
|
||||
// closed or if the chunk index is not valid.
|
||||
func (q *chunkQueue) WaitFor(index uint32) <-chan uint32 {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
ch := make(chan uint32, 1)
|
||||
switch {
|
||||
case q.snapshot == nil:
|
||||
close(ch)
|
||||
case index >= q.snapshot.Chunks:
|
||||
close(ch)
|
||||
case q.chunkFiles[index] != "":
|
||||
ch <- index
|
||||
close(ch)
|
||||
default:
|
||||
if q.waiters[index] == nil {
|
||||
q.waiters[index] = make([]chan<- uint32, 0)
|
||||
}
|
||||
q.waiters[index] = append(q.waiters[index], ch)
|
||||
}
|
||||
return ch
|
||||
}
|
||||
Reference in New Issue
Block a user