iavlstore: initial state sync implementation

This commit is contained in:
Erik Grinaker
2020-09-01 21:12:06 +02:00
parent ebcf9d46a0
commit 342651b89e
18 changed files with 3722 additions and 45 deletions

View File

@@ -218,7 +218,7 @@ build_c-amazonlinux:
# Run a 4-node testnet locally
localnet-start: localnet-stop build-docker-localnode
@if ! [ -f build/node0/config/genesis.json ]; then docker run --rm -v $(CURDIR)/build:/tendermint:Z tendermint/localnode testnet --config /etc/tendermint/config-template.toml --o . --starting-ip-address 192.167.10.2; fi
docker-compose up
docker-compose up node0 node1 node2
.PHONY: localnet-start
# Stop testnet

View File

@@ -4,23 +4,31 @@ import (
"bytes"
"errors"
"fmt"
"os"
"path/filepath"
"github.com/cosmos/iavl"
dbm "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/abci/example/code"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots"
snapshottypes "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/version"
)
var _ types.Application = (*Application)(nil)
var _ abci.Application = (*Application)(nil)
type Application struct {
types.BaseApplication
store *iavl.MutableTree
abci.BaseApplication
store *iavl.MutableTree
snapshotManager *snapshots.Manager
logger log.Logger
}
func NewApplication(dataDir string) *Application {
// Set up IAVL store
db, err := dbm.NewGoLevelDB("iavlstore", dataDir)
if err != nil {
panic(err)
@@ -33,13 +41,27 @@ func NewApplication(dataDir string) *Application {
if err != nil {
panic(err)
}
// Set up snapshot storage
snapshotDir := filepath.Join(dataDir, "snapshots")
snapshotDB, err := dbm.NewGoLevelDB("metadata", snapshotDir)
if err != nil {
panic(err)
}
snapshotStore, err := snapshots.NewStore(snapshotDB, snapshotDir)
if err != nil {
panic(err)
}
return &Application{
store: store,
store: store,
snapshotManager: snapshots.NewManager(snapshotStore, &snapshots.IAVLSnapshotter{MutableTree: store}),
logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)),
}
}
func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) {
return types.ResponseInfo{
func (app *Application) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
return abci.ResponseInfo{
Data: fmt.Sprintf(`{"size":%v}`, app.store.Size()),
Version: version.ABCIVersion,
AppVersion: 1,
@@ -48,43 +70,6 @@ func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo)
}
}
func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
_, _, err := parseTx(req.Tx)
if err != nil {
return types.ResponseCheckTx{
Code: code.CodeTypeEncodingError,
Log: err.Error(),
}
}
return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
}
func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeliverTx {
key, value, err := parseTx(req.Tx)
if err != nil {
panic(err)
}
app.store.Set(key, value)
return types.ResponseDeliverTx{Code: code.CodeTypeOK}
}
func (app *Application) Commit() types.ResponseCommit {
hash, _, err := app.store.SaveVersion()
if err != nil {
panic(err)
}
return types.ResponseCommit{Data: hash}
}
func (app *Application) Query(req types.RequestQuery) types.ResponseQuery {
_, value := app.store.Get(req.Data)
return types.ResponseQuery{
Height: app.store.Version(),
Key: req.Data,
Value: value,
}
}
// parseTx parses a tx in 'key=value' format into a key and value.
func parseTx(tx []byte) ([]byte, []byte, error) {
parts := bytes.Split(tx, []byte("="))
@@ -96,3 +81,148 @@ func parseTx(tx []byte) ([]byte, []byte, error) {
}
return parts[0], parts[1], nil
}
func (app *Application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
_, _, err := parseTx(req.Tx)
if err != nil {
return abci.ResponseCheckTx{
Code: code.CodeTypeEncodingError,
Log: err.Error(),
}
}
return abci.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
}
func (app *Application) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
key, value, err := parseTx(req.Tx)
if err != nil {
panic(err)
}
app.store.Set(key, value)
return abci.ResponseDeliverTx{Code: code.CodeTypeOK}
}
func (app *Application) Commit() abci.ResponseCommit {
hash, height, err := app.store.SaveVersion()
if err != nil {
panic(err)
}
// Take asynchronous state sync snapshot every 10 blocks
if height%10 == 0 {
go func() {
app.logger.Info("Creating state snapshot", "height", height)
snapshot, err := app.snapshotManager.Create(uint64(height))
if err != nil {
app.logger.Error("Failed to create state snapshot", "height", height, "err", err)
return
}
app.logger.Info("Completed state snapshot", "height", height, "format", snapshot.Format)
}()
}
return abci.ResponseCommit{Data: hash}
}
func (app *Application) Query(req abci.RequestQuery) abci.ResponseQuery {
_, value := app.store.Get(req.Data)
return abci.ResponseQuery{
Height: app.store.Version(),
Key: req.Data,
Value: value,
}
}
// ListSnapshots implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *Application) ListSnapshots(req abci.RequestListSnapshots) abci.ResponseListSnapshots {
resp := abci.ResponseListSnapshots{Snapshots: []*abci.Snapshot{}}
if app.snapshotManager == nil {
return resp
}
snapshots, err := app.snapshotManager.List()
if err != nil {
app.logger.Error("Failed to list snapshots", "err", err)
return resp
}
for _, snapshot := range snapshots {
abciSnapshot, err := snapshot.ToABCI()
if err != nil {
app.logger.Error("Failed to list snapshots", "err", err)
return resp
}
resp.Snapshots = append(resp.Snapshots, &abciSnapshot)
}
return resp
}
// LoadSnapshotChunk implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *Application) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk {
if app.snapshotManager == nil {
return abci.ResponseLoadSnapshotChunk{}
}
chunk, err := app.snapshotManager.LoadChunk(req.Height, req.Format, req.Chunk)
if err != nil {
app.logger.Error("Failed to load snapshot chunk", "height", req.Height, "format", req.Format,
"chunk", req.Chunk, "err")
return abci.ResponseLoadSnapshotChunk{}
}
return abci.ResponseLoadSnapshotChunk{Chunk: chunk}
}
// OfferSnapshot implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *Application) OfferSnapshot(req abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot {
if req.Snapshot == nil {
app.logger.Error("Received nil snapshot")
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}
}
snapshot, err := snapshottypes.SnapshotFromABCI(req.Snapshot)
if err != nil {
app.logger.Error("Failed to decode snapshot metadata", "err", err)
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}
}
err = app.snapshotManager.Restore(snapshot)
switch {
case err == nil:
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}
case errors.Is(err, snapshottypes.ErrUnknownFormat):
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}
case errors.Is(err, snapshottypes.ErrInvalidMetadata):
app.logger.Error("Rejecting invalid snapshot", "height", req.Snapshot.Height,
"format", req.Snapshot.Format, "err", err)
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}
default:
app.logger.Error("Failed to restore snapshot", "height", req.Snapshot.Height,
"format", req.Snapshot.Format, "err", err)
// We currently don't support resetting the IAVL stores and retrying a different snapshot,
// so we ask Tendermint to abort all snapshot restoration.
return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}
}
}
// ApplySnapshotChunk implements the ABCI interface. It delegates to app.snapshotManager if set.
func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk {
_, err := app.snapshotManager.RestoreChunk(req.Chunk)
switch {
case err == nil:
return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}
case errors.Is(err, snapshottypes.ErrChunkHashMismatch):
app.logger.Error("Chunk checksum mismatch, rejecting sender and requesting refetch",
"chunk", req.Index, "sender", req.Sender, "err", err)
return abci.ResponseApplySnapshotChunk{
Result: abci.ResponseApplySnapshotChunk_RETRY,
RefetchChunks: []uint32{req.Index},
RejectSenders: []string{req.Sender},
}
default:
app.logger.Error("Failed to restore snapshot", "err", err)
return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ABORT}
}
}

View File

@@ -0,0 +1,152 @@
package snapshots_test
import (
"bytes"
"crypto/sha256"
"errors"
"io"
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types"
)
func checksum(b []byte) []byte {
hash := sha256.Sum256(b)
return hash[:]
}
func checksums(slice [][]byte) [][]byte {
checksums := [][]byte{}
for _, chunk := range slice {
checksums = append(checksums, checksum(chunk))
}
return checksums
}
func hash(chunks [][]byte) []byte {
hasher := sha256.New()
for _, chunk := range chunks {
hasher.Write(chunk)
}
return hasher.Sum(nil)
}
func makeChunks(chunks [][]byte) <-chan io.ReadCloser {
ch := make(chan io.ReadCloser, len(chunks))
for _, chunk := range chunks {
ch <- ioutil.NopCloser(bytes.NewReader(chunk))
}
close(ch)
return ch
}
func readChunks(chunks <-chan io.ReadCloser) [][]byte {
bodies := [][]byte{}
for chunk := range chunks {
body, err := ioutil.ReadAll(chunk)
if err != nil {
panic(err)
}
bodies = append(bodies, body)
}
return bodies
}
type mockSnapshotter struct {
chunks [][]byte
}
func (m *mockSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
if format == 0 {
return types.ErrUnknownFormat
}
if m.chunks != nil {
return errors.New("already has contents")
}
if ready != nil {
close(ready)
}
m.chunks = [][]byte{}
for reader := range chunks {
chunk, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
m.chunks = append(m.chunks, chunk)
}
return nil
}
func (m *mockSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
if format == 0 {
return nil, types.ErrUnknownFormat
}
ch := make(chan io.ReadCloser, len(m.chunks))
for _, chunk := range m.chunks {
ch <- ioutil.NopCloser(bytes.NewReader(chunk))
}
close(ch)
return ch, nil
}
// setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1.
// The snapshot will complete when the returned closer is called.
func setupBusyManager(t *testing.T) (*snapshots.Manager, func()) {
tempdir, err := ioutil.TempDir("", "")
require.NoError(t, err)
store, err := snapshots.NewStore(db.NewMemDB(), tempdir)
require.NoError(t, err)
hung := newHungSnapshotter()
mgr := snapshots.NewManager(store, hung)
go func() {
_, err := mgr.Create(1)
require.NoError(t, err)
}()
time.Sleep(10 * time.Millisecond)
closer := func() {
hung.Close()
os.RemoveAll(tempdir)
}
return mgr, closer
}
// hungSnapshotter can be used to test operations in progress. Call close to end the snapshot.
type hungSnapshotter struct {
ch chan struct{}
}
func newHungSnapshotter() *hungSnapshotter {
return &hungSnapshotter{
ch: make(chan struct{}),
}
}
func (m *hungSnapshotter) Close() {
close(m.ch)
}
func (m *hungSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
<-m.ch
ch := make(chan io.ReadCloser, 1)
ch <- ioutil.NopCloser(bytes.NewReader([]byte{}))
return ch, nil
}
func (m *hungSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
panic("not implemented")
}

View File

@@ -0,0 +1,244 @@
package snapshots
import (
"bufio"
"compress/zlib"
"errors"
"fmt"
"io"
"math"
"sort"
"strings"
"github.com/cosmos/iavl"
snapshottypes "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types"
"github.com/tendermint/tendermint/libs/protoio"
)
const (
snapshotChunkSize = uint64(10e6)
snapshotBufferSize = int(snapshotChunkSize)
snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
// IAVLSnapshotter is a wrapper around an IAVL store that does snapshotting.
// It implements the Snapshotter interface.
type IAVLSnapshotter struct {
*iavl.MutableTree
}
// Snapshot implements Snapshotter.
func (tree *IAVLSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
if format != snapshottypes.CurrentFormat {
return nil, fmt.Errorf("%w %v", snapshottypes.ErrUnknownFormat, format)
}
if height == 0 {
return nil, errors.New("cannot snapshot height 0")
}
if height > uint64(tree.Version()) {
return nil, fmt.Errorf("cannot snapshot future height %v", height)
}
// Collect stores to snapshot (only IAVL stores are supported)
type namedStore struct {
*iavl.MutableTree
name string
}
stores := []namedStore{{
MutableTree: tree.MutableTree,
name: "iavlstore",
}}
sort.Slice(stores, func(i, j int) bool {
return strings.Compare(stores[i].name, stores[j].name) == -1
})
// Spawn goroutine to generate snapshot chunks and pass their io.ReadClosers through a channel
ch := make(chan io.ReadCloser)
go func() {
// Set up a stream pipeline to serialize snapshot nodes:
// ExportNode -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser
chunkWriter := NewChunkWriter(ch, snapshotChunkSize)
defer chunkWriter.Close()
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
defer func() {
if err := bufWriter.Flush(); err != nil {
chunkWriter.CloseWithError(err)
}
}()
zWriter, err := zlib.NewWriterLevel(bufWriter, 7)
if err != nil {
chunkWriter.CloseWithError(fmt.Errorf("zlib error: %w", err))
return
}
defer func() {
if err := zWriter.Close(); err != nil {
chunkWriter.CloseWithError(err)
}
}()
protoWriter := protoio.NewDelimitedWriter(zWriter)
defer func() {
if err := protoWriter.Close(); err != nil {
chunkWriter.CloseWithError(err)
}
}()
// Export each IAVL store. Stores are serialized as a stream of SnapshotItem Protobuf
// messages. The first item contains a SnapshotStore with store metadata (i.e. name),
// and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes
// are demarcated by new SnapshotStore items.
for _, store := range stores {
tree, err := store.GetImmutable(int64(height))
if err != nil {
chunkWriter.CloseWithError(err)
return
}
exporter := tree.Export()
defer exporter.Close()
_, err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Store{
Store: &snapshottypes.SnapshotStoreItem{
Name: store.name,
},
},
})
if err != nil {
chunkWriter.CloseWithError(err)
return
}
for {
node, err := exporter.Next()
if err == iavl.ExportDone {
break
} else if err != nil {
chunkWriter.CloseWithError(err)
return
}
_, err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_IAVL{
IAVL: &snapshottypes.SnapshotIAVLItem{
Key: node.Key,
Value: node.Value,
Height: int32(node.Height),
Version: node.Version,
},
},
})
if err != nil {
chunkWriter.CloseWithError(err)
return
}
}
exporter.Close()
}
}()
return ch, nil
}
// Restore implements snapshottypes.Snapshotter.
func (tree *IAVLSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
if format != snapshottypes.CurrentFormat {
return fmt.Errorf("%w %v", snapshottypes.ErrUnknownFormat, format)
}
if height == 0 {
return fmt.Errorf("%w: cannot restore snapshot at height 0", snapshottypes.ErrInvalidMetadata)
}
if height > math.MaxInt64 {
return fmt.Errorf("%w: snapshot height %v cannot exceed %v", snapshottypes.ErrInvalidMetadata,
height, math.MaxInt64)
}
// Signal readiness. Must be done before the readers below are set up, since the zlib
// reader reads from the stream on initialization, potentially causing deadlocks.
if ready != nil {
close(ready)
}
// Set up a restore stream pipeline
// chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode
chunkReader := NewChunkReader(chunks)
defer chunkReader.Close()
zReader, err := zlib.NewReader(chunkReader)
if err != nil {
return fmt.Errorf("zlib error: %w", err)
}
defer zReader.Close()
protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize)
defer protoReader.Close()
// Import nodes into stores. The first item is expected to be a SnapshotItem containing
// a SnapshotStoreItem, telling us which store to import into. The following items will contain
// SnapshotNodeItem (i.e. ExportNode) until we reach the next SnapshotStoreItem or EOF.
var importer *iavl.Importer
for {
item := &snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(item)
if err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("invalid protobuf message: %w", err)
}
switch item := item.Item.(type) {
case *snapshottypes.SnapshotItem_Store:
if importer != nil {
err = importer.Commit()
if err != nil {
return fmt.Errorf("IAVL commit failed: %w", err)
}
importer.Close()
}
if item.Store.Name != "iavlstore" {
return fmt.Errorf("cannot import into non-IAVL store %q", item.Store.Name)
}
importer, err = tree.Import(int64(height))
if err != nil {
return fmt.Errorf("import failed: %w", err)
}
defer importer.Close()
case *snapshottypes.SnapshotItem_IAVL:
if importer == nil {
return fmt.Errorf("received IAVL node item before store item")
}
if item.IAVL.Height > math.MaxInt8 {
return fmt.Errorf("node height %v cannot exceed %v", item.IAVL.Height, math.MaxInt8)
}
node := &iavl.ExportNode{
Key: item.IAVL.Key,
Value: item.IAVL.Value,
Height: int8(item.IAVL.Height),
Version: item.IAVL.Version,
}
// Protobuf does not differentiate between []byte{} as nil, but fortunately IAVL does
// not allow nil keys nor nil values for leaf nodes, so we can always set them to empty.
if node.Key == nil {
node.Key = []byte{}
}
if node.Height == 0 && node.Value == nil {
node.Value = []byte{}
}
err := importer.Add(node)
if err != nil {
return fmt.Errorf("IAVL node import failed: %w", err)
}
default:
return fmt.Errorf("unknown snapshot item %T", item)
}
}
if importer != nil {
err := importer.Commit()
if err != nil {
return fmt.Errorf("IAVL commit failed: %w", err)
}
importer.Close()
}
_, err = tree.Load()
return err
}

View File

@@ -0,0 +1,260 @@
package snapshots
import (
"bytes"
"crypto/sha256"
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types"
)
const (
opNone operation = ""
opSnapshot operation = "snapshot"
opPrune operation = "prune"
opRestore operation = "restore"
chunkBufferSize = 4
)
// operation represents a Manager operation. Only one operation can be in progress at a time.
type operation string
// restoreDone represents the result of a restore operation.
type restoreDone struct {
complete bool // if true, restore completed successfully (not prematurely)
err error // if non-nil, restore errored
}
// Manager manages snapshot and restore operations for an app, making sure only a single
// long-running operation is in progress at any given time, and provides convenience methods
// mirroring the ABCI interface.
//
// Although the ABCI interface (and this manager) passes chunks as byte slices, the internal
// snapshot/restore APIs use IO streams (i.e. chan io.ReadCloser), for two reasons:
//
// 1) In the future, ABCI should support streaming. Consider e.g. InitChain during chain
// upgrades, which currently passes the entire chain state as an in-memory byte slice.
// https://github.com/tendermint/tendermint/issues/5184
//
// 2) io.ReadCloser streams automatically propagate IO errors, and can pass arbitrary
// errors via io.Pipe.CloseWithError().
type Manager struct {
store *Store
target types.Snapshotter
mtx sync.Mutex
operation operation
chRestore chan<- io.ReadCloser
chRestoreDone <-chan restoreDone
restoreChunkHashes [][]byte
restoreChunkIndex uint32
}
// NewManager creates a new manager.
func NewManager(store *Store, target types.Snapshotter) *Manager {
return &Manager{
store: store,
target: target,
}
}
// begin starts an operation, or errors if one is in progress. It manages the mutex itself.
func (m *Manager) begin(op operation) error {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.beginLocked(op)
}
// beginLocked begins an operation while already holding the mutex.
func (m *Manager) beginLocked(op operation) error {
if op == opNone {
return errors.New("can't begin a none operation")
}
if m.operation != opNone {
return fmt.Errorf("a %v operation is in progress", m.operation)
}
m.operation = op
return nil
}
// end ends the current operation.
func (m *Manager) end() {
m.mtx.Lock()
defer m.mtx.Unlock()
m.endLocked()
}
// endLocked ends the current operation while already holding the mutex.
func (m *Manager) endLocked() {
m.operation = opNone
if m.chRestore != nil {
close(m.chRestore)
m.chRestore = nil
}
m.chRestoreDone = nil
m.restoreChunkHashes = nil
m.restoreChunkIndex = 0
}
// Create creates a snapshot and returns its metadata.
func (m *Manager) Create(height uint64) (*types.Snapshot, error) {
if m == nil {
return nil, errors.New("no snapshot store configured")
}
err := m.begin(opSnapshot)
if err != nil {
return nil, err
}
defer m.end()
latest, err := m.store.GetLatest()
if err != nil {
return nil, fmt.Errorf("failed to examine latest snapshot: %w", err)
}
if latest != nil && latest.Height >= height {
return nil, fmt.Errorf("a more recent snapshot already exists at height %v", latest.Height)
}
chunks, err := m.target.Snapshot(height, types.CurrentFormat)
if err != nil {
return nil, err
}
return m.store.Save(height, types.CurrentFormat, chunks)
}
// List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations.
func (m *Manager) List() ([]*types.Snapshot, error) {
return m.store.List()
}
// LoadChunk loads a chunk into a byte slice, mirroring ABCI LoadChunk. It can be called
// concurrently with other operations. If the chunk does not exist, nil is returned.
func (m *Manager) LoadChunk(height uint64, format uint32, chunk uint32) ([]byte, error) {
reader, err := m.store.LoadChunk(height, format, chunk)
if err != nil {
return nil, err
}
if reader == nil {
return nil, nil
}
defer reader.Close()
return ioutil.ReadAll(reader)
}
// Prune prunes snapshots, if no other operations are in progress.
func (m *Manager) Prune(retain uint32) (uint64, error) {
err := m.begin(opPrune)
if err != nil {
return 0, err
}
defer m.end()
return m.store.Prune(retain)
}
// Restore begins an async snapshot restoration, mirroring ABCI OfferSnapshot. Chunks must be fed
// via RestoreChunk() until the restore is complete or a chunk fails.
func (m *Manager) Restore(snapshot types.Snapshot) error {
if snapshot.Chunks == 0 {
return fmt.Errorf("%w: no chunks", types.ErrInvalidMetadata)
}
if uint32(len(snapshot.Metadata.ChunkHashes)) != snapshot.Chunks {
return fmt.Errorf("%w: snapshot has %v chunk hashes, but %v chunks",
types.ErrInvalidMetadata,
uint32(len(snapshot.Metadata.ChunkHashes)),
snapshot.Chunks)
}
m.mtx.Lock()
defer m.mtx.Unlock()
err := m.beginLocked(opRestore)
if err != nil {
return err
}
// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
chChunks := make(chan io.ReadCloser, chunkBufferSize)
chReady := make(chan struct{}, 1)
chDone := make(chan restoreDone, 1)
go func() {
err := m.target.Restore(snapshot.Height, snapshot.Format, chChunks, chReady)
chDone <- restoreDone{
complete: err == nil,
err: err,
}
close(chDone)
}()
// Check for any initial errors from the restore, before any chunks are fed.
select {
case done := <-chDone:
m.endLocked()
if done.err != nil {
return done.err
}
return errors.New("restore ended unexpectedly")
case <-chReady:
}
m.chRestore = chChunks
m.chRestoreDone = chDone
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
m.restoreChunkIndex = 0
return nil
}
// RestoreChunk adds a chunk to an active snapshot restoration, mirroring ABCI ApplySnapshotChunk.
// Chunks must be given until the restore is complete, returning true, or a chunk errors.
func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.operation != opRestore {
return false, fmt.Errorf("no restore operation in progress")
}
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
return false, errors.New("received unexpected chunk")
}
// Check if any errors have occurred yet.
select {
case done := <-m.chRestoreDone:
m.endLocked()
if done.err != nil {
return false, done.err
}
return false, errors.New("restore ended unexpectedly")
default:
}
// Verify the chunk hash.
hash := sha256.Sum256(chunk)
expected := m.restoreChunkHashes[m.restoreChunkIndex]
if !bytes.Equal(hash[:], expected) {
return false, fmt.Errorf("%w (expected %x, got %x)",
types.ErrChunkHashMismatch, hash, expected)
}
// Pass the chunk to the restore, and wait for completion if it was the final one.
m.chRestore <- ioutil.NopCloser(bytes.NewReader(chunk))
m.restoreChunkIndex++
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
close(m.chRestore)
m.chRestore = nil
done := <-m.chRestoreDone
m.endLocked()
if done.err != nil {
return false, done.err
}
if !done.complete {
return false, errors.New("restore ended prematurely")
}
return true, nil
}
return false, nil
}

View File

@@ -0,0 +1,221 @@
package snapshots_test
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types"
)
func TestManager_List(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
manager := snapshots.NewManager(store, nil)
mgrList, err := manager.List()
require.NoError(t, err)
storeList, err := store.List()
require.NoError(t, err)
require.NotEmpty(t, storeList)
assert.Equal(t, storeList, mgrList)
// list should not block or error on busy managers
manager, teardown = setupBusyManager(t)
defer teardown()
list, err := manager.List()
require.NoError(t, err)
assert.Equal(t, []*types.Snapshot{}, list)
}
func TestManager_LoadChunk(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
manager := snapshots.NewManager(store, nil)
// Existing chunk should return body
chunk, err := manager.LoadChunk(2, 1, 1)
require.NoError(t, err)
assert.Equal(t, []byte{2, 1, 1}, chunk)
// Missing chunk should return nil
chunk, err = manager.LoadChunk(2, 1, 9)
require.NoError(t, err)
assert.Nil(t, chunk)
// LoadChunk should not block or error on busy managers
manager, teardown = setupBusyManager(t)
defer teardown()
chunk, err = manager.LoadChunk(2, 1, 0)
require.NoError(t, err)
assert.Nil(t, chunk)
}
func TestManager_Take(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
snapshotter := &mockSnapshotter{
chunks: [][]byte{
{1, 2, 3},
{4, 5, 6},
{7, 8, 9},
},
}
manager := snapshots.NewManager(store, snapshotter)
// nil manager should return error
_, err := (*snapshots.Manager)(nil).Create(1)
require.Error(t, err)
// creating a snapshot at a lower height than the latest should error
_, err = manager.Create(3)
require.Error(t, err)
// creating a snapshot at a higher height should be fine, and should return it
snapshot, err := manager.Create(5)
require.NoError(t, err)
assert.Equal(t, &types.Snapshot{
Height: 5,
Format: types.CurrentFormat,
Chunks: 3,
Hash: []uint8{0x47, 0xe4, 0xee, 0x7f, 0x21, 0x1f, 0x73, 0x26, 0x5d, 0xd1, 0x76, 0x58, 0xf6, 0xe2, 0x1c, 0x13, 0x18, 0xbd, 0x6c, 0x81, 0xf3, 0x75, 0x98, 0xe2, 0xa, 0x27, 0x56, 0x29, 0x95, 0x42, 0xef, 0xcf},
Metadata: types.Metadata{
ChunkHashes: [][]byte{
checksum([]byte{1, 2, 3}),
checksum([]byte{4, 5, 6}),
checksum([]byte{7, 8, 9}),
},
},
}, snapshot)
storeSnapshot, chunks, err := store.Load(snapshot.Height, snapshot.Format)
require.NoError(t, err)
assert.Equal(t, snapshot, storeSnapshot)
assert.Equal(t, [][]byte{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, readChunks(chunks))
// creating a snapshot while a different snapshot is being created should error
manager, teardown = setupBusyManager(t)
defer teardown()
_, err = manager.Create(9)
require.Error(t, err)
}
func TestManager_Prune(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
manager := snapshots.NewManager(store, nil)
pruned, err := manager.Prune(2)
require.NoError(t, err)
assert.EqualValues(t, 1, pruned)
list, err := manager.List()
require.NoError(t, err)
assert.Len(t, list, 3)
// Prune should error while a snapshot is being taken
manager, teardown = setupBusyManager(t)
defer teardown()
_, err = manager.Prune(2)
require.Error(t, err)
}
func TestManager_Restore(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
target := &mockSnapshotter{}
manager := snapshots.NewManager(store, target)
chunks := [][]byte{
{1, 2, 3},
{4, 5, 6},
{7, 8, 9},
}
// Restore errors on invalid format
err := manager.Restore(types.Snapshot{
Height: 3,
Format: 0,
Hash: []byte{1, 2, 3},
Chunks: uint32(len(chunks)),
Metadata: types.Metadata{ChunkHashes: checksums(chunks)},
})
require.Error(t, err)
require.Equal(t, types.ErrUnknownFormat, err)
// Restore errors on no chunks
err = manager.Restore(types.Snapshot{Height: 3, Format: 1, Hash: []byte{1, 2, 3}})
require.Error(t, err)
// Restore errors on chunk and chunkhashes mismatch
err = manager.Restore(types.Snapshot{
Height: 3,
Format: 1,
Hash: []byte{1, 2, 3},
Chunks: 4,
Metadata: types.Metadata{ChunkHashes: checksums(chunks)},
})
require.Error(t, err)
// Starting a restore works
err = manager.Restore(types.Snapshot{
Height: 3,
Format: 1,
Hash: []byte{1, 2, 3},
Chunks: 3,
Metadata: types.Metadata{ChunkHashes: checksums(chunks)},
})
require.NoError(t, err)
// While the restore is in progress, any other operations fail
_, err = manager.Create(4)
require.Error(t, err)
_, err = manager.Prune(1)
require.Error(t, err)
// Feeding an invalid chunk should error due to invalid checksum, but not abort restoration.
_, err = manager.RestoreChunk([]byte{9, 9, 9})
require.Error(t, err)
require.True(t, errors.Is(err, types.ErrChunkHashMismatch))
// Feeding the chunks should work
for i, chunk := range chunks {
done, err := manager.RestoreChunk(chunk)
require.NoError(t, err)
if i == len(chunks)-1 {
assert.True(t, done)
} else {
assert.False(t, done)
}
}
assert.Equal(t, chunks, target.chunks)
// Starting a new restore should fail now, because the target already has contents.
err = manager.Restore(types.Snapshot{
Height: 3,
Format: 1,
Hash: []byte{1, 2, 3},
Chunks: 3,
Metadata: types.Metadata{ChunkHashes: checksums(chunks)},
})
require.Error(t, err)
// But if we clear out the target we should be able to start a new restore. This time we'll
// fail it with a checksum error. That error should stop the operation, so that we can do
// a prune operation right after.
target.chunks = nil
err = manager.Restore(types.Snapshot{
Height: 3,
Format: 1,
Hash: []byte{1, 2, 3},
Chunks: 3,
Metadata: types.Metadata{ChunkHashes: checksums(chunks)},
})
require.NoError(t, err)
}

View File

@@ -0,0 +1,366 @@
package snapshots
import (
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"strconv"
"sync"
"github.com/gogo/protobuf/proto"
db "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types"
)
const (
// keyPrefixSnapshot is the prefix for snapshot database keys
keyPrefixSnapshot byte = 0x01
)
// Store is a snapshot store, containing snapshot metadata and binary chunks.
type Store struct {
db db.DB
dir string
mtx sync.Mutex
saving map[uint64]bool // heights currently being saved
}
// NewStore creates a new snapshot store.
func NewStore(db db.DB, dir string) (*Store, error) {
if dir == "" {
return nil, errors.New("snapshot directory not given")
}
err := os.MkdirAll(dir, 0755)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot directory %q: %w", dir, err)
}
return &Store{
db: db,
dir: dir,
saving: make(map[uint64]bool),
}, nil
}
// Delete deletes a snapshot.
func (s *Store) Delete(height uint64, format uint32) error {
s.mtx.Lock()
saving := s.saving[height]
s.mtx.Unlock()
if saving {
return fmt.Errorf("snapshot for height %v format %v is currently being saved", height, format)
}
err := s.db.DeleteSync(encodeKey(height, format))
if err != nil {
return fmt.Errorf("failed to delete snapshot for height %v format %v: %w",
height, format, err)
}
err = os.RemoveAll(s.pathSnapshot(height, format))
if err != nil {
return fmt.Errorf("failed to delete snapshot chunks for height %v format %v: %w",
height, format, err)
}
return nil
}
// Get fetches snapshot info from the database.
func (s *Store) Get(height uint64, format uint32) (*types.Snapshot, error) {
bytes, err := s.db.Get(encodeKey(height, format))
if err != nil {
return nil, fmt.Errorf("failed to fetch snapshot metadata for height %v format %v: %w",
height, format, err)
}
if bytes == nil {
return nil, nil
}
snapshot := &types.Snapshot{}
err = proto.Unmarshal(bytes, snapshot)
if err != nil {
return nil, fmt.Errorf("failed to decode snapshot metadata for height %v format %v: %w",
height, format, err)
}
if snapshot.Metadata.ChunkHashes == nil {
snapshot.Metadata.ChunkHashes = [][]byte{}
}
return snapshot, nil
}
// Get fetches the latest snapshot from the database, if any.
func (s *Store) GetLatest() (*types.Snapshot, error) {
iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(math.MaxUint64, math.MaxUint32))
if err != nil {
return nil, fmt.Errorf("failed to find latest snapshot: %w", err)
}
defer iter.Close()
var snapshot *types.Snapshot
if iter.Valid() {
snapshot = &types.Snapshot{}
err := proto.Unmarshal(iter.Value(), snapshot)
if err != nil {
return nil, fmt.Errorf("failed to decode latest snapshot: %w", err)
}
}
err = iter.Error()
if err != nil {
return nil, fmt.Errorf("failed to find latest snapshot: %w", err)
}
return snapshot, nil
}
// List lists snapshots, in reverse order (newest first).
func (s *Store) List() ([]*types.Snapshot, error) {
iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(math.MaxUint64, math.MaxUint32))
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
defer iter.Close()
snapshots := make([]*types.Snapshot, 0)
for ; iter.Valid(); iter.Next() {
snapshot := &types.Snapshot{}
err := proto.Unmarshal(iter.Value(), snapshot)
if err != nil {
return nil, fmt.Errorf("failed to decode snapshot info: %w", err)
}
snapshots = append(snapshots, snapshot)
}
err = iter.Error()
if err != nil {
return nil, err
}
return snapshots, nil
}
// Load loads a snapshot (both metadata and binary chunks). The chunks must be consumed and closed.
// Returns nil if the snapshot does not exist.
func (s *Store) Load(height uint64, format uint32) (*types.Snapshot, <-chan io.ReadCloser, error) {
snapshot, err := s.Get(height, format)
if err != nil {
return nil, nil, err
}
if snapshot == nil {
return nil, nil, nil
}
ch := make(chan io.ReadCloser)
go func() {
defer close(ch)
for i := uint32(0); i < snapshot.Chunks; i++ {
pr, pw := io.Pipe()
ch <- pr
chunk, err := s.loadChunkFile(height, format, i)
if err != nil {
pw.CloseWithError(err)
return
}
defer chunk.Close()
_, err = io.Copy(pw, chunk)
if err != nil {
pw.CloseWithError(err)
return
}
chunk.Close()
pw.Close()
}
}()
return snapshot, ch, nil
}
// LoadChunk loads a chunk from disk, or returns nil if it does not exist. The caller must call
// Close() on it when done.
func (s *Store) LoadChunk(height uint64, format uint32, chunk uint32) (io.ReadCloser, error) {
path := s.pathChunk(height, format, chunk)
file, err := os.Open(path)
if os.IsNotExist(err) {
return nil, nil
}
return file, err
}
// loadChunkFile loads a chunk from disk, and errors if it does not exist.
func (s *Store) loadChunkFile(height uint64, format uint32, chunk uint32) (io.ReadCloser, error) {
path := s.pathChunk(height, format, chunk)
file, err := os.Open(path)
if err != nil {
return nil, err
}
return file, nil
}
// Prune removes old snapshots. The given number of most recent heights (regardless of format) are retained.
func (s *Store) Prune(retain uint32) (uint64, error) {
iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(math.MaxUint64, math.MaxUint32))
if err != nil {
return 0, fmt.Errorf("failed to prune snapshots: %w", err)
}
defer iter.Close()
pruned := uint64(0)
prunedHeights := make(map[uint64]bool)
skip := make(map[uint64]bool)
for ; iter.Valid(); iter.Next() {
height, format, err := decodeKey(iter.Key())
if err != nil {
return 0, fmt.Errorf("failed to prune snapshots: %w", err)
}
if skip[height] || uint32(len(skip)) < retain {
skip[height] = true
continue
}
err = s.Delete(height, format)
if err != nil {
return 0, fmt.Errorf("failed to prune snapshots: %w", err)
}
pruned++
prunedHeights[height] = true
}
// Since Delete() deletes a specific format, while we want to prune a height, we clean up
// the height directory as well
for height, ok := range prunedHeights {
if ok {
err = os.Remove(s.pathHeight(height))
if err != nil {
return 0, fmt.Errorf("failed to remove snapshot directory for height %v", height)
}
}
}
err = iter.Error()
if err != nil {
return 0, err
}
return pruned, nil
}
// Save saves a snapshot to disk, returning it.
func (s *Store) Save(
height uint64, format uint32, chunks <-chan io.ReadCloser,
) (*types.Snapshot, error) {
defer DrainChunks(chunks)
if height == 0 {
return nil, errors.New("snapshot height cannot be 0")
}
s.mtx.Lock()
saving := s.saving[height]
s.saving[height] = true
s.mtx.Unlock()
if saving {
return nil, fmt.Errorf("a snapshot for height %v is already being saved", height)
}
defer func() {
s.mtx.Lock()
delete(s.saving, height)
s.mtx.Unlock()
}()
exists, err := s.db.Has(encodeKey(height, format))
if err != nil {
return nil, err
}
if exists {
return nil, fmt.Errorf("snapshot already exists for height %v format %v", height, format)
}
snapshot := &types.Snapshot{
Height: height,
Format: format,
}
index := uint32(0)
snapshotHasher := sha256.New()
chunkHasher := sha256.New()
for chunkBody := range chunks {
defer chunkBody.Close() // nolint: staticcheck
dir := s.pathSnapshot(height, format)
err = os.MkdirAll(dir, 0755)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot directory %q: %w", dir, err)
}
path := s.pathChunk(height, format, index)
file, err := os.Create(path)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot chunk file %q: %w", path, err)
}
defer file.Close() // nolint: staticcheck
chunkHasher.Reset()
_, err = io.Copy(io.MultiWriter(file, chunkHasher, snapshotHasher), chunkBody)
if err != nil {
return nil, fmt.Errorf("failed to generate snapshot chunk %v: %w", index, err)
}
err = file.Close()
if err != nil {
return nil, fmt.Errorf("failed to close snapshot chunk %v: %w", index, err)
}
err = chunkBody.Close()
if err != nil {
return nil, fmt.Errorf("failed to close snapshot chunk %v: %w", index, err)
}
snapshot.Metadata.ChunkHashes = append(snapshot.Metadata.ChunkHashes, chunkHasher.Sum(nil))
index++
}
snapshot.Chunks = index
snapshot.Hash = snapshotHasher.Sum(nil)
err = s.saveSnapshot(snapshot)
if err != nil {
return nil, err
}
return snapshot, nil
}
// saveSnapshot saves snapshot metadata to the database.
func (s *Store) saveSnapshot(snapshot *types.Snapshot) error {
value, err := proto.Marshal(snapshot)
if err != nil {
return fmt.Errorf("failed to encode snapshot metadata: %w", err)
}
err = s.db.SetSync(encodeKey(snapshot.Height, snapshot.Format), value)
if err != nil {
return fmt.Errorf("failed to store snapshot: %w", err)
}
return nil
}
// pathHeight generates the path to a height, containing multiple snapshot formats.
func (s *Store) pathHeight(height uint64) string {
return filepath.Join(s.dir, strconv.FormatUint(height, 10))
}
// pathSnapshot generates a snapshot path, as a specific format under a height.
func (s *Store) pathSnapshot(height uint64, format uint32) string {
return filepath.Join(s.pathHeight(height), strconv.FormatUint(uint64(format), 10))
}
// pathChunk generates a snapshot chunk path.
func (s *Store) pathChunk(height uint64, format uint32, chunk uint32) string {
return filepath.Join(s.pathSnapshot(height, format), strconv.FormatUint(uint64(chunk), 10))
}
// decodeKey decodes a snapshot key.
func decodeKey(k []byte) (uint64, uint32, error) {
if len(k) != 13 {
return 0, 0, fmt.Errorf("invalid snapshot key with length %v", len(k))
}
if k[0] != keyPrefixSnapshot {
return 0, 0, fmt.Errorf("invalid snapshot key prefix %x", k[0])
}
height := binary.BigEndian.Uint64(k[1:9])
format := binary.BigEndian.Uint32(k[9:13])
return height, format, nil
}
// encodeKey encodes a snapshot key.
func encodeKey(height uint64, format uint32) []byte {
k := make([]byte, 13)
k[0] = keyPrefixSnapshot
binary.BigEndian.PutUint64(k[1:], height)
binary.BigEndian.PutUint32(k[9:], format)
return k
}

View File

@@ -0,0 +1,366 @@
package snapshots_test
import (
"bytes"
"errors"
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types"
)
func setupStore(t *testing.T) (*snapshots.Store, func()) {
tempdir, err := ioutil.TempDir("", "snapshots")
require.NoError(t, err)
store, err := snapshots.NewStore(db.NewMemDB(), tempdir)
require.NoError(t, err)
_, err = store.Save(1, 1, makeChunks([][]byte{
{1, 1, 0}, {1, 1, 1},
}))
require.NoError(t, err)
_, err = store.Save(2, 1, makeChunks([][]byte{
{2, 1, 0}, {2, 1, 1},
}))
require.NoError(t, err)
_, err = store.Save(2, 2, makeChunks([][]byte{
{2, 2, 0}, {2, 2, 1}, {2, 2, 2},
}))
require.NoError(t, err)
_, err = store.Save(3, 2, makeChunks([][]byte{
{3, 2, 0}, {3, 2, 1}, {3, 2, 2},
}))
require.NoError(t, err)
teardown := func() {
err := os.RemoveAll(tempdir)
if err != nil {
t.Logf("Failed to remove tempdir %q: %v", tempdir, err)
}
}
return store, teardown
}
func TestNewStore(t *testing.T) {
tempdir, err := ioutil.TempDir("", "snapshots")
require.NoError(t, err)
defer os.RemoveAll(tempdir)
_, err = snapshots.NewStore(db.NewMemDB(), tempdir)
require.NoError(t, err)
}
func TestNewStore_ErrNoDir(t *testing.T) {
_, err := snapshots.NewStore(db.NewMemDB(), "")
require.Error(t, err)
}
func TestNewStore_ErrDirFailure(t *testing.T) {
tempfile, err := ioutil.TempFile("", "snapshots")
require.NoError(t, err)
defer func() {
os.RemoveAll(tempfile.Name())
tempfile.Close()
}()
tempdir := filepath.Join(tempfile.Name(), "subdir")
_, err = snapshots.NewStore(db.NewMemDB(), tempdir)
require.Error(t, err)
}
func TestStore_Delete(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
// Deleting a snapshot should remove it
err := store.Delete(2, 2)
require.NoError(t, err)
snapshot, err := store.Get(2, 2)
require.NoError(t, err)
assert.Nil(t, snapshot)
snapshots, err := store.List()
require.NoError(t, err)
assert.Len(t, snapshots, 3)
// Deleting it again should not error
err = store.Delete(2, 2)
require.NoError(t, err)
// Deleting a snapshot being saved should error
ch := make(chan io.ReadCloser)
go store.Save(9, 1, ch)
time.Sleep(10 * time.Millisecond)
err = store.Delete(9, 1)
require.Error(t, err)
// But after it's saved it should work
close(ch)
time.Sleep(10 * time.Millisecond)
err = store.Delete(9, 1)
require.NoError(t, err)
}
func TestStore_Get(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
// Loading a missing snapshot should return nil
snapshot, err := store.Get(9, 9)
require.NoError(t, err)
assert.Nil(t, snapshot)
// Loading a snapshot should returns its metadata
snapshot, err = store.Get(2, 1)
require.NoError(t, err)
assert.Equal(t, &types.Snapshot{
Height: 2,
Format: 1,
Chunks: 2,
Hash: hash([][]byte{{2, 1, 0}, {2, 1, 1}}),
Metadata: types.Metadata{
ChunkHashes: [][]byte{
checksum([]byte{2, 1, 0}),
checksum([]byte{2, 1, 1}),
},
},
}, snapshot)
}
func TestStore_GetLatest(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
// Loading a missing snapshot should return nil
snapshot, err := store.GetLatest()
require.NoError(t, err)
assert.Equal(t, &types.Snapshot{
Height: 3,
Format: 2,
Chunks: 3,
Hash: hash([][]byte{
{3, 2, 0},
{3, 2, 1},
{3, 2, 2},
}),
Metadata: types.Metadata{
ChunkHashes: checksums([][]byte{
{3, 2, 0},
{3, 2, 1},
{3, 2, 2},
}),
},
}, snapshot)
}
func TestStore_List(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
snapshots, err := store.List()
require.NoError(t, err)
require.Equal(t, []*types.Snapshot{
{Height: 3, Format: 2, Chunks: 3, Hash: hash([][]byte{{3, 2, 0}, {3, 2, 1}, {3, 2, 2}}),
Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{3, 2, 0}, {3, 2, 1}, {3, 2, 2}})},
},
{Height: 2, Format: 2, Chunks: 3, Hash: hash([][]byte{{2, 2, 0}, {2, 2, 1}, {2, 2, 2}}),
Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{2, 2, 0}, {2, 2, 1}, {2, 2, 2}})},
},
{Height: 2, Format: 1, Chunks: 2, Hash: hash([][]byte{{2, 1, 0}, {2, 1, 1}}),
Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{2, 1, 0}, {2, 1, 1}})},
},
{Height: 1, Format: 1, Chunks: 2, Hash: hash([][]byte{{1, 1, 0}, {1, 1, 1}}),
Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{1, 1, 0}, {1, 1, 1}})},
},
}, snapshots)
}
func TestStore_Load(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
// Loading a missing snapshot should return nil
snapshot, chunks, err := store.Load(9, 9)
require.NoError(t, err)
assert.Nil(t, snapshot)
assert.Nil(t, chunks)
// Loading a snapshot should returns its metadata and chunks
snapshot, chunks, err = store.Load(2, 1)
require.NoError(t, err)
assert.Equal(t, &types.Snapshot{
Height: 2,
Format: 1,
Chunks: 2,
Hash: hash([][]byte{{2, 1, 0}, {2, 1, 1}}),
Metadata: types.Metadata{
ChunkHashes: [][]byte{
checksum([]byte{2, 1, 0}),
checksum([]byte{2, 1, 1}),
},
},
}, snapshot)
for i := uint32(0); i < snapshot.Chunks; i++ {
reader, ok := <-chunks
require.True(t, ok)
chunk, err := ioutil.ReadAll(reader)
require.NoError(t, err)
err = reader.Close()
require.NoError(t, err)
assert.Equal(t, []byte{2, 1, byte(i)}, chunk)
}
assert.Empty(t, chunks)
}
func TestStore_LoadChunk(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
// Loading a missing snapshot should return nil
chunk, err := store.LoadChunk(9, 9, 0)
require.NoError(t, err)
assert.Nil(t, chunk)
// Loading a missing chunk index should return nil
chunk, err = store.LoadChunk(2, 1, 2)
require.NoError(t, err)
require.Nil(t, chunk)
// Loading a chunk should returns a content reader
chunk, err = store.LoadChunk(2, 1, 0)
require.NoError(t, err)
require.NotNil(t, chunk)
body, err := ioutil.ReadAll(chunk)
require.NoError(t, err)
assert.Equal(t, []byte{2, 1, 0}, body)
err = chunk.Close()
require.NoError(t, err)
}
func TestStore_Prune(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
// Pruning too many snapshots should be fine
pruned, err := store.Prune(4)
require.NoError(t, err)
assert.EqualValues(t, 0, pruned)
snapshots, err := store.List()
require.NoError(t, err)
assert.Len(t, snapshots, 4)
// Pruning until the last two heights should leave three snapshots (for two heights)
pruned, err = store.Prune(2)
require.NoError(t, err)
assert.EqualValues(t, 1, pruned)
snapshots, err = store.List()
require.NoError(t, err)
require.Equal(t, []*types.Snapshot{
{Height: 3, Format: 2, Chunks: 3, Hash: hash([][]byte{{3, 2, 0}, {3, 2, 1}, {3, 2, 2}}),
Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{3, 2, 0}, {3, 2, 1}, {3, 2, 2}})},
},
{Height: 2, Format: 2, Chunks: 3, Hash: hash([][]byte{{2, 2, 0}, {2, 2, 1}, {2, 2, 2}}),
Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{2, 2, 0}, {2, 2, 1}, {2, 2, 2}})},
},
{Height: 2, Format: 1, Chunks: 2, Hash: hash([][]byte{{2, 1, 0}, {2, 1, 1}}),
Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{2, 1, 0}, {2, 1, 1}})},
},
}, snapshots)
// Pruning all heights should also be fine
pruned, err = store.Prune(0)
require.NoError(t, err)
assert.EqualValues(t, 3, pruned)
snapshots, err = store.List()
require.NoError(t, err)
assert.Empty(t, snapshots)
}
func TestStore_Save(t *testing.T) {
store, teardown := setupStore(t)
defer teardown()
// Saving a snapshot should work
snapshot, err := store.Save(4, 1, makeChunks([][]byte{{1}, {2}}))
require.NoError(t, err)
assert.Equal(t, &types.Snapshot{
Height: 4,
Format: 1,
Chunks: 2,
Hash: hash([][]byte{{1}, {2}}),
Metadata: types.Metadata{
ChunkHashes: [][]byte{
checksum([]byte{1}),
checksum([]byte{2}),
},
},
}, snapshot)
loaded, err := store.Get(snapshot.Height, snapshot.Format)
require.NoError(t, err)
assert.Equal(t, snapshot, loaded)
// Saving an existing snapshot should error
_, err = store.Save(4, 1, makeChunks([][]byte{{1}, {2}}))
require.Error(t, err)
// Saving at height 0 should error
_, err = store.Save(0, 1, makeChunks([][]byte{{1}, {2}}))
require.Error(t, err)
// Saving at format 0 should be fine
_, err = store.Save(1, 0, makeChunks([][]byte{{1}, {2}}))
require.NoError(t, err)
// Saving a snapshot with no chunks should be fine, as should loading it
_, err = store.Save(5, 1, makeChunks([][]byte{}))
require.NoError(t, err)
snapshot, chunks, err := store.Load(5, 1)
require.NoError(t, err)
assert.Equal(t, &types.Snapshot{Height: 5, Format: 1, Hash: hash([][]byte{}), Metadata: types.Metadata{ChunkHashes: [][]byte{}}}, snapshot)
assert.Empty(t, chunks)
// Saving a snapshot should error if a chunk reader returns an error, and it should empty out
// the channel
someErr := errors.New("boom")
pr, pw := io.Pipe()
err = pw.CloseWithError(someErr)
require.NoError(t, err)
ch := make(chan io.ReadCloser, 2)
ch <- pr
ch <- ioutil.NopCloser(bytes.NewBuffer([]byte{0xff}))
close(ch)
_, err = store.Save(6, 1, ch)
require.Error(t, err)
require.True(t, errors.Is(err, someErr))
assert.Empty(t, ch)
// Saving a snapshot should error if a snapshot is already in progress for the same height,
// regardless of format. However, a different height should succeed.
ch = make(chan io.ReadCloser)
go store.Save(7, 1, ch)
time.Sleep(10 * time.Millisecond)
_, err = store.Save(7, 2, makeChunks(nil))
require.Error(t, err)
_, err = store.Save(8, 1, makeChunks(nil))
require.NoError(t, err)
close(ch)
}

View File

@@ -0,0 +1,39 @@
package types
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
)
// Converts an ABCI snapshot to a snapshot. Mainly to decode the SDK metadata.
func SnapshotFromABCI(in *abci.Snapshot) (Snapshot, error) {
snapshot := Snapshot{
Height: in.Height,
Format: in.Format,
Chunks: in.Chunks,
Hash: in.Hash,
}
err := proto.Unmarshal(in.Metadata, &snapshot.Metadata)
if err != nil {
return Snapshot{}, fmt.Errorf("failed to unmarshal snapshot metadata: %w", err)
}
return snapshot, nil
}
// Converts a Snapshot to its ABCI representation. Mainly to encode the SDK metadata.
func (s Snapshot) ToABCI() (abci.Snapshot, error) {
out := abci.Snapshot{
Height: s.Height,
Format: s.Format,
Chunks: s.Chunks,
Hash: s.Hash,
}
var err error
out.Metadata, err = proto.Marshal(&s.Metadata)
if err != nil {
return abci.Snapshot{}, fmt.Errorf("failed to marshal snapshot metadata: %w", err)
}
return out, nil
}

View File

@@ -0,0 +1,16 @@
package types
import (
"errors"
)
var (
// ErrUnknownFormat is returned when an unknown format is used.
ErrUnknownFormat = errors.New("unknown snapshot format")
// ErrChunkHashMismatch is returned when chunk hash verification failed.
ErrChunkHashMismatch = errors.New("chunk hash verification failed")
// ErrInvalidMetadata is returned when the snapshot metadata is invalid.
ErrInvalidMetadata = errors.New("invalid snapshot metadata")
)

View File

@@ -0,0 +1,6 @@
package types
// CurrentFormat is the currently used format for snapshots. Snapshots using the same format
// must be identical across all nodes for a given height, so this must be bumped when the binary
// snapshot output changes.
const CurrentFormat uint32 = 1

View File

@@ -0,0 +1,667 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: cosmos/base/snapshots/v1beta1/snapshot.proto
package types
import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
// Snapshot contains Tendermint state sync snapshot info.
type Snapshot struct {
Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"`
Format uint32 `protobuf:"varint,2,opt,name=format,proto3" json:"format,omitempty"`
Chunks uint32 `protobuf:"varint,3,opt,name=chunks,proto3" json:"chunks,omitempty"`
Hash []byte `protobuf:"bytes,4,opt,name=hash,proto3" json:"hash,omitempty"`
Metadata Metadata `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata"`
}
func (m *Snapshot) Reset() { *m = Snapshot{} }
func (m *Snapshot) String() string { return proto.CompactTextString(m) }
func (*Snapshot) ProtoMessage() {}
func (*Snapshot) Descriptor() ([]byte, []int) {
return fileDescriptor_dd7a3c9b0a19e1ee, []int{0}
}
func (m *Snapshot) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Snapshot) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Snapshot.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Snapshot) XXX_Merge(src proto.Message) {
xxx_messageInfo_Snapshot.Merge(m, src)
}
func (m *Snapshot) XXX_Size() int {
return m.Size()
}
func (m *Snapshot) XXX_DiscardUnknown() {
xxx_messageInfo_Snapshot.DiscardUnknown(m)
}
var xxx_messageInfo_Snapshot proto.InternalMessageInfo
func (m *Snapshot) GetHeight() uint64 {
if m != nil {
return m.Height
}
return 0
}
func (m *Snapshot) GetFormat() uint32 {
if m != nil {
return m.Format
}
return 0
}
func (m *Snapshot) GetChunks() uint32 {
if m != nil {
return m.Chunks
}
return 0
}
func (m *Snapshot) GetHash() []byte {
if m != nil {
return m.Hash
}
return nil
}
func (m *Snapshot) GetMetadata() Metadata {
if m != nil {
return m.Metadata
}
return Metadata{}
}
// Metadata contains SDK-specific snapshot metadata.
type Metadata struct {
ChunkHashes [][]byte `protobuf:"bytes,1,rep,name=chunk_hashes,json=chunkHashes,proto3" json:"chunk_hashes,omitempty"`
}
func (m *Metadata) Reset() { *m = Metadata{} }
func (m *Metadata) String() string { return proto.CompactTextString(m) }
func (*Metadata) ProtoMessage() {}
func (*Metadata) Descriptor() ([]byte, []int) {
return fileDescriptor_dd7a3c9b0a19e1ee, []int{1}
}
func (m *Metadata) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Metadata.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Metadata) XXX_Merge(src proto.Message) {
xxx_messageInfo_Metadata.Merge(m, src)
}
func (m *Metadata) XXX_Size() int {
return m.Size()
}
func (m *Metadata) XXX_DiscardUnknown() {
xxx_messageInfo_Metadata.DiscardUnknown(m)
}
var xxx_messageInfo_Metadata proto.InternalMessageInfo
func (m *Metadata) GetChunkHashes() [][]byte {
if m != nil {
return m.ChunkHashes
}
return nil
}
func init() {
proto.RegisterType((*Snapshot)(nil), "cosmos.base.snapshots.v1beta1.Snapshot")
proto.RegisterType((*Metadata)(nil), "cosmos.base.snapshots.v1beta1.Metadata")
}
func init() {
proto.RegisterFile("cosmos/base/snapshots/v1beta1/snapshot.proto", fileDescriptor_dd7a3c9b0a19e1ee)
}
var fileDescriptor_dd7a3c9b0a19e1ee = []byte{
// 288 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x31, 0x4e, 0xc3, 0x30,
0x14, 0x86, 0x63, 0x1a, 0xaa, 0xc8, 0x0d, 0x8b, 0x85, 0x90, 0x85, 0x84, 0x09, 0x5d, 0xc8, 0xd0,
0x3a, 0x2a, 0xdc, 0xa0, 0x03, 0x82, 0x81, 0x25, 0x6c, 0x2c, 0xc8, 0x49, 0x4d, 0x5c, 0x55, 0xa9,
0xa3, 0x3e, 0x17, 0x89, 0x5b, 0x70, 0x15, 0x6e, 0xd1, 0xb1, 0x23, 0x13, 0x42, 0xc9, 0x45, 0x50,
0x1c, 0x13, 0x31, 0x75, 0xca, 0xfb, 0xbf, 0x7c, 0x4f, 0xcf, 0xfa, 0xf1, 0x24, 0xd7, 0x50, 0x6a,
0x48, 0x32, 0x01, 0x32, 0x81, 0xb5, 0xa8, 0x40, 0x69, 0x03, 0xc9, 0xdb, 0x2c, 0x93, 0x46, 0xcc,
0x7a, 0xc2, 0xab, 0x8d, 0x36, 0x9a, 0x5c, 0x74, 0x36, 0x6f, 0x6d, 0xde, 0xdb, 0xdc, 0xd9, 0xe7,
0xa7, 0x85, 0x2e, 0xb4, 0x35, 0x93, 0x76, 0xea, 0x96, 0xc6, 0x9f, 0x08, 0x07, 0x4f, 0xce, 0x25,
0x67, 0x78, 0xa8, 0xe4, 0xb2, 0x50, 0x86, 0xa2, 0x08, 0xc5, 0x7e, 0xea, 0x52, 0xcb, 0x5f, 0xf5,
0xa6, 0x14, 0x86, 0x1e, 0x45, 0x28, 0x3e, 0x49, 0x5d, 0x6a, 0x79, 0xae, 0xb6, 0xeb, 0x15, 0xd0,
0x41, 0xc7, 0xbb, 0x44, 0x08, 0xf6, 0x95, 0x00, 0x45, 0xfd, 0x08, 0xc5, 0x61, 0x6a, 0x67, 0xf2,
0x80, 0x83, 0x52, 0x1a, 0xb1, 0x10, 0x46, 0xd0, 0xe3, 0x08, 0xc5, 0xa3, 0x9b, 0x6b, 0x7e, 0xf0,
0xc1, 0xfc, 0xd1, 0xe9, 0x73, 0x7f, 0xf7, 0x7d, 0xe9, 0xa5, 0xfd, 0xfa, 0x78, 0x8a, 0x83, 0xbf,
0x7f, 0xe4, 0x0a, 0x87, 0xf6, 0xe8, 0x4b, 0x7b, 0x44, 0x02, 0x45, 0xd1, 0x20, 0x0e, 0xd3, 0x91,
0x65, 0xf7, 0x16, 0xcd, 0xef, 0x76, 0x35, 0x43, 0xfb, 0x9a, 0xa1, 0x9f, 0x9a, 0xa1, 0x8f, 0x86,
0x79, 0xfb, 0x86, 0x79, 0x5f, 0x0d, 0xf3, 0x9e, 0x27, 0xc5, 0xd2, 0xa8, 0x6d, 0xc6, 0x73, 0x5d,
0x26, 0xae, 0xea, 0xee, 0x33, 0x85, 0xc5, 0xea, 0x5f, 0xe1, 0xe6, 0xbd, 0x92, 0x90, 0x0d, 0x6d,
0x63, 0xb7, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x89, 0x2e, 0x8f, 0x96, 0x01, 0x00, 0x00,
}
func (m *Snapshot) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Snapshot) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Snapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
{
size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintSnapshot(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
if len(m.Hash) > 0 {
i -= len(m.Hash)
copy(dAtA[i:], m.Hash)
i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Hash)))
i--
dAtA[i] = 0x22
}
if m.Chunks != 0 {
i = encodeVarintSnapshot(dAtA, i, uint64(m.Chunks))
i--
dAtA[i] = 0x18
}
if m.Format != 0 {
i = encodeVarintSnapshot(dAtA, i, uint64(m.Format))
i--
dAtA[i] = 0x10
}
if m.Height != 0 {
i = encodeVarintSnapshot(dAtA, i, uint64(m.Height))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *Metadata) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Metadata) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Metadata) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.ChunkHashes) > 0 {
for iNdEx := len(m.ChunkHashes) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.ChunkHashes[iNdEx])
copy(dAtA[i:], m.ChunkHashes[iNdEx])
i = encodeVarintSnapshot(dAtA, i, uint64(len(m.ChunkHashes[iNdEx])))
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func encodeVarintSnapshot(dAtA []byte, offset int, v uint64) int {
offset -= sovSnapshot(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *Snapshot) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Height != 0 {
n += 1 + sovSnapshot(uint64(m.Height))
}
if m.Format != 0 {
n += 1 + sovSnapshot(uint64(m.Format))
}
if m.Chunks != 0 {
n += 1 + sovSnapshot(uint64(m.Chunks))
}
l = len(m.Hash)
if l > 0 {
n += 1 + l + sovSnapshot(uint64(l))
}
l = m.Metadata.Size()
n += 1 + l + sovSnapshot(uint64(l))
return n
}
func (m *Metadata) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.ChunkHashes) > 0 {
for _, b := range m.ChunkHashes {
l = len(b)
n += 1 + l + sovSnapshot(uint64(l))
}
}
return n
}
func sovSnapshot(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozSnapshot(x uint64) (n int) {
return sovSnapshot(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Snapshot) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Snapshot: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Snapshot: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType)
}
m.Height = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Height |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Format", wireType)
}
m.Format = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Format |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType)
}
m.Chunks = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Chunks |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Hash = append(m.Hash[:0], dAtA[iNdEx:postIndex]...)
if m.Hash == nil {
m.Hash = []byte{}
}
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := m.Metadata.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSnapshot(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Metadata) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Metadata: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Metadata: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ChunkHashes", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ChunkHashes = append(m.ChunkHashes, make([]byte, postIndex-iNdEx))
copy(m.ChunkHashes[len(m.ChunkHashes)-1], dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSnapshot(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipSnapshot(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowSnapshot
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowSnapshot
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowSnapshot
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthSnapshot
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupSnapshot
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthSnapshot
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthSnapshot = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowSnapshot = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupSnapshot = fmt.Errorf("proto: unexpected end of group")
)

View File

@@ -0,0 +1,16 @@
package types
import "io"
// Snapshotter is something that can create and restore snapshots, consisting of streamed binary
// chunks - all of which must be read from the channel and closed. If an unsupported format is
// given, it must return ErrUnknownFormat (possibly wrapped with fmt.Errorf).
type Snapshotter interface {
// Snapshot creates a state snapshot, returning a channel of snapshot chunk readers.
Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error)
// Restore restores a state snapshot, taking snapshot chunk readers as input.
// If the ready channel is non-nil, it returns a ready signal (by being closed) once the
// restorer is ready to accept chunks.
Restore(height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{}) error
}

View File

@@ -0,0 +1,851 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: cosmos/base/store/v1beta1/snapshot.proto
package types
import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
// SnapshotItem is an item contained in a rootmulti.Store snapshot.
type SnapshotItem struct {
// item is the specific type of snapshot item.
//
// Types that are valid to be assigned to Item:
// *SnapshotItem_Store
// *SnapshotItem_IAVL
Item isSnapshotItem_Item `protobuf_oneof:"item"`
}
func (m *SnapshotItem) Reset() { *m = SnapshotItem{} }
func (m *SnapshotItem) String() string { return proto.CompactTextString(m) }
func (*SnapshotItem) ProtoMessage() {}
func (*SnapshotItem) Descriptor() ([]byte, []int) {
return fileDescriptor_9c55879db4cc4502, []int{0}
}
func (m *SnapshotItem) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *SnapshotItem) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_SnapshotItem.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *SnapshotItem) XXX_Merge(src proto.Message) {
xxx_messageInfo_SnapshotItem.Merge(m, src)
}
func (m *SnapshotItem) XXX_Size() int {
return m.Size()
}
func (m *SnapshotItem) XXX_DiscardUnknown() {
xxx_messageInfo_SnapshotItem.DiscardUnknown(m)
}
var xxx_messageInfo_SnapshotItem proto.InternalMessageInfo
type isSnapshotItem_Item interface {
isSnapshotItem_Item()
MarshalTo([]byte) (int, error)
Size() int
}
type SnapshotItem_Store struct {
Store *SnapshotStoreItem `protobuf:"bytes,1,opt,name=store,proto3,oneof" json:"store,omitempty"`
}
type SnapshotItem_IAVL struct {
IAVL *SnapshotIAVLItem `protobuf:"bytes,2,opt,name=iavl,proto3,oneof" json:"iavl,omitempty"`
}
func (*SnapshotItem_Store) isSnapshotItem_Item() {}
func (*SnapshotItem_IAVL) isSnapshotItem_Item() {}
func (m *SnapshotItem) GetItem() isSnapshotItem_Item {
if m != nil {
return m.Item
}
return nil
}
func (m *SnapshotItem) GetStore() *SnapshotStoreItem {
if x, ok := m.GetItem().(*SnapshotItem_Store); ok {
return x.Store
}
return nil
}
func (m *SnapshotItem) GetIAVL() *SnapshotIAVLItem {
if x, ok := m.GetItem().(*SnapshotItem_IAVL); ok {
return x.IAVL
}
return nil
}
// XXX_OneofWrappers is for the internal use of the proto package.
func (*SnapshotItem) XXX_OneofWrappers() []interface{} {
return []interface{}{
(*SnapshotItem_Store)(nil),
(*SnapshotItem_IAVL)(nil),
}
}
// SnapshotStoreItem contains metadata about a snapshotted store.
type SnapshotStoreItem struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}
func (m *SnapshotStoreItem) Reset() { *m = SnapshotStoreItem{} }
func (m *SnapshotStoreItem) String() string { return proto.CompactTextString(m) }
func (*SnapshotStoreItem) ProtoMessage() {}
func (*SnapshotStoreItem) Descriptor() ([]byte, []int) {
return fileDescriptor_9c55879db4cc4502, []int{1}
}
func (m *SnapshotStoreItem) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *SnapshotStoreItem) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_SnapshotStoreItem.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *SnapshotStoreItem) XXX_Merge(src proto.Message) {
xxx_messageInfo_SnapshotStoreItem.Merge(m, src)
}
func (m *SnapshotStoreItem) XXX_Size() int {
return m.Size()
}
func (m *SnapshotStoreItem) XXX_DiscardUnknown() {
xxx_messageInfo_SnapshotStoreItem.DiscardUnknown(m)
}
var xxx_messageInfo_SnapshotStoreItem proto.InternalMessageInfo
func (m *SnapshotStoreItem) GetName() string {
if m != nil {
return m.Name
}
return ""
}
// SnapshotIAVLItem is an exported IAVL node.
type SnapshotIAVLItem struct {
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Version int64 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
Height int32 `protobuf:"varint,4,opt,name=height,proto3" json:"height,omitempty"`
}
func (m *SnapshotIAVLItem) Reset() { *m = SnapshotIAVLItem{} }
func (m *SnapshotIAVLItem) String() string { return proto.CompactTextString(m) }
func (*SnapshotIAVLItem) ProtoMessage() {}
func (*SnapshotIAVLItem) Descriptor() ([]byte, []int) {
return fileDescriptor_9c55879db4cc4502, []int{2}
}
func (m *SnapshotIAVLItem) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *SnapshotIAVLItem) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_SnapshotIAVLItem.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *SnapshotIAVLItem) XXX_Merge(src proto.Message) {
xxx_messageInfo_SnapshotIAVLItem.Merge(m, src)
}
func (m *SnapshotIAVLItem) XXX_Size() int {
return m.Size()
}
func (m *SnapshotIAVLItem) XXX_DiscardUnknown() {
xxx_messageInfo_SnapshotIAVLItem.DiscardUnknown(m)
}
var xxx_messageInfo_SnapshotIAVLItem proto.InternalMessageInfo
func (m *SnapshotIAVLItem) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func (m *SnapshotIAVLItem) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *SnapshotIAVLItem) GetVersion() int64 {
if m != nil {
return m.Version
}
return 0
}
func (m *SnapshotIAVLItem) GetHeight() int32 {
if m != nil {
return m.Height
}
return 0
}
func init() {
proto.RegisterType((*SnapshotItem)(nil), "cosmos.base.store.v1beta1.SnapshotItem")
proto.RegisterType((*SnapshotStoreItem)(nil), "cosmos.base.store.v1beta1.SnapshotStoreItem")
proto.RegisterType((*SnapshotIAVLItem)(nil), "cosmos.base.store.v1beta1.SnapshotIAVLItem")
}
func init() {
proto.RegisterFile("cosmos/base/store/v1beta1/snapshot.proto", fileDescriptor_9c55879db4cc4502)
}
var fileDescriptor_9c55879db4cc4502 = []byte{
// 324 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xc1, 0x4a, 0xc3, 0x30,
0x18, 0xc7, 0x1b, 0xd7, 0x4d, 0xfd, 0xdc, 0x61, 0x86, 0x21, 0xd5, 0x43, 0x1d, 0xbb, 0x58, 0x50,
0x13, 0xa6, 0x4f, 0x60, 0xf1, 0xb0, 0xa1, 0xa7, 0x0c, 0x3c, 0x78, 0x4b, 0x67, 0x68, 0xcb, 0xd6,
0x65, 0x2c, 0x59, 0x61, 0x6f, 0xe1, 0x6b, 0xf8, 0x26, 0x1e, 0x77, 0xf4, 0x24, 0xd2, 0xbd, 0x88,
0x24, 0xe9, 0x2e, 0x8a, 0xe0, 0xa9, 0xdf, 0xbf, 0xfc, 0xfe, 0xbf, 0x7c, 0xf0, 0x41, 0x34, 0x91,
0xaa, 0x90, 0x8a, 0x26, 0x5c, 0x09, 0xaa, 0xb4, 0x5c, 0x0a, 0x5a, 0x0e, 0x12, 0xa1, 0xf9, 0x80,
0xaa, 0x39, 0x5f, 0xa8, 0x4c, 0x6a, 0xb2, 0x58, 0x4a, 0x2d, 0xf1, 0xa9, 0x23, 0x89, 0x21, 0x89,
0x25, 0x49, 0x4d, 0x9e, 0x75, 0x53, 0x99, 0x4a, 0x4b, 0x51, 0x33, 0xb9, 0x42, 0xff, 0x0d, 0x41,
0x7b, 0x5c, 0x3b, 0x46, 0x5a, 0x14, 0xf8, 0x1e, 0x9a, 0xb6, 0x17, 0xa0, 0x1e, 0x8a, 0x8e, 0x6e,
0xae, 0xc8, 0x9f, 0x46, 0xb2, 0xeb, 0x8d, 0xcd, 0x5f, 0x53, 0x1e, 0x7a, 0xcc, 0x95, 0xf1, 0x03,
0xf8, 0x39, 0x2f, 0x67, 0xc1, 0x9e, 0x95, 0x5c, 0xfe, 0x43, 0x32, 0xba, 0x7b, 0x7a, 0x34, 0x8e,
0xf8, 0xa0, 0xfa, 0x3c, 0xf7, 0x4d, 0x1a, 0x7a, 0xcc, 0x4a, 0xe2, 0x16, 0xf8, 0xb9, 0x16, 0x45,
0xff, 0x02, 0x8e, 0x7f, 0x3d, 0x89, 0x31, 0xf8, 0x73, 0x5e, 0xb8, 0x75, 0x0f, 0x99, 0x9d, 0xfb,
0x33, 0xe8, 0xfc, 0xd4, 0xe2, 0x0e, 0x34, 0xa6, 0x62, 0x6d, 0xb1, 0x36, 0x33, 0x23, 0xee, 0x42,
0xb3, 0xe4, 0xb3, 0x95, 0xb0, 0x4b, 0xb6, 0x99, 0x0b, 0x38, 0x80, 0xfd, 0x52, 0x2c, 0x55, 0x2e,
0xe7, 0x41, 0xa3, 0x87, 0xa2, 0x06, 0xdb, 0x45, 0x7c, 0x02, 0xad, 0x4c, 0xe4, 0x69, 0xa6, 0x03,
0xbf, 0x87, 0xa2, 0x26, 0xab, 0x53, 0x1c, 0xbf, 0x57, 0x21, 0xda, 0x54, 0x21, 0xfa, 0xaa, 0x42,
0xf4, 0xba, 0x0d, 0xbd, 0xcd, 0x36, 0xf4, 0x3e, 0xb6, 0xa1, 0xf7, 0x1c, 0xa5, 0xb9, 0xce, 0x56,
0x09, 0x99, 0xc8, 0x82, 0xd6, 0x27, 0x74, 0x9f, 0x6b, 0xf5, 0x32, 0xad, 0x0f, 0xa9, 0xd7, 0x0b,
0xa1, 0x92, 0x96, 0xbd, 0xc6, 0xed, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x75, 0x87, 0x24, 0x7b,
0xea, 0x01, 0x00, 0x00,
}
func (m *SnapshotItem) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *SnapshotItem) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotItem) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Item != nil {
{
size := m.Item.Size()
i -= size
if _, err := m.Item.MarshalTo(dAtA[i:]); err != nil {
return 0, err
}
}
}
return len(dAtA) - i, nil
}
func (m *SnapshotItem_Store) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotItem_Store) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.Store != nil {
{
size, err := m.Store.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintSnapshot(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *SnapshotItem_IAVL) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotItem_IAVL) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.IAVL != nil {
{
size, err := m.IAVL.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintSnapshot(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
return len(dAtA) - i, nil
}
func (m *SnapshotStoreItem) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *SnapshotStoreItem) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotStoreItem) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *SnapshotIAVLItem) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *SnapshotIAVLItem) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotIAVLItem) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Height != 0 {
i = encodeVarintSnapshot(dAtA, i, uint64(m.Height))
i--
dAtA[i] = 0x20
}
if m.Version != 0 {
i = encodeVarintSnapshot(dAtA, i, uint64(m.Version))
i--
dAtA[i] = 0x18
}
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x12
}
if len(m.Key) > 0 {
i -= len(m.Key)
copy(dAtA[i:], m.Key)
i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Key)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *SnapshotItem) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Item != nil {
n += m.Item.Size()
}
return n
}
func (m *SnapshotItem_Store) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Store != nil {
l = m.Store.Size()
n += 1 + l + sovSnapshot(uint64(l))
}
return n
}
func (m *SnapshotItem_IAVL) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.IAVL != nil {
l = m.IAVL.Size()
n += 1 + l + sovSnapshot(uint64(l))
}
return n
}
func (m *SnapshotStoreItem) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovSnapshot(uint64(l))
}
return n
}
func (m *SnapshotIAVLItem) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Key)
if l > 0 {
n += 1 + l + sovSnapshot(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sovSnapshot(uint64(l))
}
if m.Version != 0 {
n += 1 + sovSnapshot(uint64(m.Version))
}
if m.Height != 0 {
n += 1 + sovSnapshot(uint64(m.Height))
}
return n
}
func (m *SnapshotItem) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: SnapshotItem: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: SnapshotItem: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Store", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &SnapshotStoreItem{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Item = &SnapshotItem_Store{v}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field IAVL", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &SnapshotIAVLItem{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Item = &SnapshotItem_IAVL{v}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSnapshot(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *SnapshotStoreItem) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: SnapshotStoreItem: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: SnapshotStoreItem: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSnapshot(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *SnapshotIAVLItem) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: SnapshotIAVLItem: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: SnapshotIAVLItem: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
if m.Key == nil {
m.Key = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...)
if m.Value == nil {
m.Value = []byte{}
}
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType)
}
m.Version = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Version |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType)
}
m.Height = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Height |= int32(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipSnapshot(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}

View File

@@ -0,0 +1,162 @@
package snapshots
import (
"errors"
"io"
)
// ChunkWriter reads an input stream, splits it into fixed-size chunks, and writes them to a
// sequence of io.ReadClosers via a channel.
type ChunkWriter struct {
ch chan<- io.ReadCloser
pipe *io.PipeWriter
chunkSize uint64
written uint64
closed bool
}
// NewChunkWriter creates a new ChunkWriter. If chunkSize is 0, no chunking will be done.
func NewChunkWriter(ch chan<- io.ReadCloser, chunkSize uint64) *ChunkWriter {
return &ChunkWriter{
ch: ch,
chunkSize: chunkSize,
}
}
// chunk creates a new chunk.
func (w *ChunkWriter) chunk() error {
if w.pipe != nil {
err := w.pipe.Close()
if err != nil {
return err
}
}
pr, pw := io.Pipe()
w.ch <- pr
w.pipe = pw
w.written = 0
return nil
}
// Close implements io.Closer.
func (w *ChunkWriter) Close() error {
if !w.closed {
w.closed = true
close(w.ch)
var err error
if w.pipe != nil {
err = w.pipe.Close()
}
return err
}
return nil
}
// CloseWithError closes the writer and sends an error to the reader.
func (w *ChunkWriter) CloseWithError(err error) {
if !w.closed {
w.closed = true
close(w.ch)
if w.pipe != nil {
w.pipe.CloseWithError(err)
}
}
}
// Write implements io.Writer.
func (w *ChunkWriter) Write(data []byte) (int, error) {
if w.closed {
return 0, errors.New("cannot write to closed ChunkWriter")
}
nTotal := 0
for len(data) > 0 {
if w.pipe == nil || (w.written >= w.chunkSize && w.chunkSize > 0) {
err := w.chunk()
if err != nil {
return nTotal, err
}
}
var writeSize uint64
if w.chunkSize == 0 {
writeSize = uint64(len(data))
} else {
writeSize = w.chunkSize - w.written
}
if writeSize > uint64(len(data)) {
writeSize = uint64(len(data))
}
n, err := w.pipe.Write(data[:writeSize])
w.written += uint64(n)
nTotal += n
if err != nil {
return nTotal, err
}
data = data[writeSize:]
}
return nTotal, nil
}
// ChunkReader reads chunks from a channel of io.ReadClosers and outputs them as an io.Reader
type ChunkReader struct {
ch <-chan io.ReadCloser
reader io.ReadCloser
}
// NewChunkReader creates a new ChunkReader.
func NewChunkReader(ch <-chan io.ReadCloser) *ChunkReader {
return &ChunkReader{ch: ch}
}
// next fetches the next chunk from the channel, or returns io.EOF if there are no more chunks.
func (r *ChunkReader) next() error {
reader, ok := <-r.ch
if !ok {
return io.EOF
}
r.reader = reader
return nil
}
// Close implements io.ReadCloser.
func (r *ChunkReader) Close() error {
var err error
if r.reader != nil {
err = r.reader.Close()
r.reader = nil
}
for reader := range r.ch {
if e := reader.Close(); e != nil && err == nil {
err = e
}
}
return err
}
// Read implements io.Reader.
func (r *ChunkReader) Read(p []byte) (int, error) {
if r.reader == nil {
err := r.next()
if err != nil {
return 0, err
}
}
n, err := r.reader.Read(p)
if err == io.EOF {
err = r.reader.Close()
r.reader = nil
if err != nil {
return 0, err
}
return r.Read(p)
}
return n, err
}
// DrainChunks drains and closes all remaining chunks from a chunk channel.
func DrainChunks(chunks <-chan io.ReadCloser) {
for chunk := range chunks {
_ = chunk.Close()
}
}

View File

@@ -0,0 +1,166 @@
package snapshots_test
import (
"bytes"
"errors"
"io"
"io/ioutil"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/iavlstore/snapshots"
)
func TestChunkWriter(t *testing.T) {
ch := make(chan io.ReadCloser, 100)
go func() {
chunkWriter := snapshots.NewChunkWriter(ch, 2)
n, err := chunkWriter.Write([]byte{1, 2, 3})
require.NoError(t, err)
assert.Equal(t, 3, n)
n, err = chunkWriter.Write([]byte{4, 5, 6})
require.NoError(t, err)
assert.Equal(t, 3, n)
n, err = chunkWriter.Write([]byte{7, 8, 9})
require.NoError(t, err)
assert.Equal(t, 3, n)
err = chunkWriter.Close()
require.NoError(t, err)
// closed writer should error
_, err = chunkWriter.Write([]byte{10})
require.Error(t, err)
// closing again should be fine
err = chunkWriter.Close()
require.NoError(t, err)
}()
assert.Equal(t, [][]byte{{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9}}, readChunks(ch))
// 0-sized chunks should return the whole body as one chunk
ch = make(chan io.ReadCloser, 100)
go func() {
chunkWriter := snapshots.NewChunkWriter(ch, 0)
_, err := chunkWriter.Write([]byte{1, 2, 3})
require.NoError(t, err)
_, err = chunkWriter.Write([]byte{4, 5, 6})
require.NoError(t, err)
err = chunkWriter.Close()
require.NoError(t, err)
}()
assert.Equal(t, [][]byte{{1, 2, 3, 4, 5, 6}}, readChunks(ch))
// closing with error should return the error
theErr := errors.New("boom")
ch = make(chan io.ReadCloser, 100)
go func() {
chunkWriter := snapshots.NewChunkWriter(ch, 2)
_, err := chunkWriter.Write([]byte{1, 2, 3})
require.NoError(t, err)
chunkWriter.CloseWithError(theErr)
}()
chunk, err := ioutil.ReadAll(<-ch)
require.NoError(t, err)
assert.Equal(t, []byte{1, 2}, chunk)
_, err = ioutil.ReadAll(<-ch)
require.Error(t, err)
assert.Equal(t, theErr, err)
assert.Empty(t, ch)
// closing immediately should return no chunks
ch = make(chan io.ReadCloser, 100)
chunkWriter := snapshots.NewChunkWriter(ch, 2)
err = chunkWriter.Close()
require.NoError(t, err)
assert.Empty(t, ch)
}
func TestChunkReader(t *testing.T) {
ch := makeChunks([][]byte{
{1, 2, 3},
{4},
{},
{5, 6},
})
chunkReader := snapshots.NewChunkReader(ch)
buf := []byte{0, 0, 0, 0}
n, err := chunkReader.Read(buf)
require.NoError(t, err)
assert.Equal(t, 3, n)
assert.Equal(t, []byte{1, 2, 3, 0}, buf)
buf = []byte{0, 0, 0, 0}
n, err = chunkReader.Read(buf)
require.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, []byte{4, 0, 0, 0}, buf)
buf = []byte{0, 0, 0, 0}
n, err = chunkReader.Read(buf)
require.NoError(t, err)
assert.Equal(t, 2, n)
assert.Equal(t, []byte{5, 6, 0, 0}, buf)
buf = []byte{0, 0, 0, 0}
_, err = chunkReader.Read(buf)
require.Error(t, err)
assert.Equal(t, io.EOF, err)
err = chunkReader.Close()
require.NoError(t, err)
err = chunkReader.Close() // closing twice should be fine
require.NoError(t, err)
// Empty channel should be fine
ch = makeChunks(nil)
chunkReader = snapshots.NewChunkReader(ch)
buf = make([]byte, 4)
_, err = chunkReader.Read(buf)
require.Error(t, err)
assert.Equal(t, io.EOF, err)
// Using a pipe that closes with an error should return the error
theErr := errors.New("boom")
pr, pw := io.Pipe()
pch := make(chan io.ReadCloser, 1)
pch <- pr
pw.CloseWithError(theErr)
chunkReader = snapshots.NewChunkReader(pch)
buf = make([]byte, 4)
_, err = chunkReader.Read(buf)
require.Error(t, err)
assert.Equal(t, theErr, err)
// Closing the reader should close the writer
pr, pw = io.Pipe()
pch = make(chan io.ReadCloser, 2)
pch <- ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
pch <- pr
close(pch)
go func() {
chunkReader = snapshots.NewChunkReader(pch)
buf = []byte{0, 0, 0, 0}
_, err = chunkReader.Read(buf)
require.NoError(t, err)
assert.Equal(t, []byte{1, 2, 3, 0}, buf)
err = chunkReader.Close()
require.NoError(t, err)
}()
_, err = pw.Write([]byte{9, 9, 9})
require.Error(t, err)
assert.Equal(t, err, io.ErrClosedPipe)
}

5
go.mod
View File

@@ -15,14 +15,19 @@ require (
github.com/gtank/merlin v0.1.1
github.com/libp2p/go-buffer-pool v0.0.2
github.com/minio/highwayhash v1.0.0
github.com/pelletier/go-toml v1.6.0 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/rs/cors v1.7.0
github.com/sasha-s/go-deadlock v0.2.0
github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa
github.com/spf13/afero v1.2.1 // indirect
github.com/spf13/cobra v1.0.0
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.7.1
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/tendermint/tm-db v0.6.2
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a

10
go.sum
View File

@@ -362,6 +362,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.6.0 h1:aetoXYr0Tv7xRU/V4B4IZJ2QcbtMUFoNb3ORp7TzIK4=
github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
@@ -449,6 +451,8 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.2.1 h1:qgMbHoJbPbw579P+1zVY+6n4nIFuIchaIjzZ/I/Yq8M=
github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
@@ -458,9 +462,13 @@ github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8=
github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
@@ -471,6 +479,8 @@ github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5J
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=