From 342651b89e3cd2e007f4ec0079d32f93c410f29c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 1 Sep 2020 21:12:06 +0200 Subject: [PATCH] iavlstore: initial state sync implementation --- Makefile | 2 +- abci/example/iavlstore/iavlstore.go | 218 ++++- .../iavlstore/snapshots/helpers_test.go | 152 ++++ .../iavlstore/snapshots/iavlsnapshotter.go | 244 +++++ abci/example/iavlstore/snapshots/manager.go | 260 ++++++ .../iavlstore/snapshots/manager_test.go | 221 +++++ abci/example/iavlstore/snapshots/store.go | 366 ++++++++ .../example/iavlstore/snapshots/store_test.go | 366 ++++++++ .../iavlstore/snapshots/types/convert.go | 39 + .../iavlstore/snapshots/types/errors.go | 16 + .../iavlstore/snapshots/types/format.go | 6 + .../iavlstore/snapshots/types/snapshot.pb.go | 667 ++++++++++++++ .../iavlstore/snapshots/types/snapshotter.go | 16 + .../iavlstore/snapshots/types/store.pb.go | 851 ++++++++++++++++++ abci/example/iavlstore/snapshots/util.go | 162 ++++ abci/example/iavlstore/snapshots/util_test.go | 166 ++++ go.mod | 5 + go.sum | 10 + 18 files changed, 3722 insertions(+), 45 deletions(-) create mode 100644 abci/example/iavlstore/snapshots/helpers_test.go create mode 100644 abci/example/iavlstore/snapshots/iavlsnapshotter.go create mode 100644 abci/example/iavlstore/snapshots/manager.go create mode 100644 abci/example/iavlstore/snapshots/manager_test.go create mode 100644 abci/example/iavlstore/snapshots/store.go create mode 100644 abci/example/iavlstore/snapshots/store_test.go create mode 100644 abci/example/iavlstore/snapshots/types/convert.go create mode 100644 abci/example/iavlstore/snapshots/types/errors.go create mode 100644 abci/example/iavlstore/snapshots/types/format.go create mode 100644 abci/example/iavlstore/snapshots/types/snapshot.pb.go create mode 100644 abci/example/iavlstore/snapshots/types/snapshotter.go create mode 100644 abci/example/iavlstore/snapshots/types/store.pb.go create mode 100644 abci/example/iavlstore/snapshots/util.go create mode 100644 abci/example/iavlstore/snapshots/util_test.go diff --git a/Makefile b/Makefile index 46abb868a..e74fdb105 100644 --- a/Makefile +++ b/Makefile @@ -218,7 +218,7 @@ build_c-amazonlinux: # Run a 4-node testnet locally localnet-start: localnet-stop build-docker-localnode @if ! [ -f build/node0/config/genesis.json ]; then docker run --rm -v $(CURDIR)/build:/tendermint:Z tendermint/localnode testnet --config /etc/tendermint/config-template.toml --o . --starting-ip-address 192.167.10.2; fi - docker-compose up + docker-compose up node0 node1 node2 .PHONY: localnet-start # Stop testnet diff --git a/abci/example/iavlstore/iavlstore.go b/abci/example/iavlstore/iavlstore.go index 30ea0f046..c09aad09e 100644 --- a/abci/example/iavlstore/iavlstore.go +++ b/abci/example/iavlstore/iavlstore.go @@ -4,23 +4,31 @@ import ( "bytes" "errors" "fmt" + "os" + "path/filepath" "github.com/cosmos/iavl" dbm "github.com/tendermint/tm-db" "github.com/tendermint/tendermint/abci/example/code" - "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots" + snapshottypes "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/version" ) -var _ types.Application = (*Application)(nil) +var _ abci.Application = (*Application)(nil) type Application struct { - types.BaseApplication - store *iavl.MutableTree + abci.BaseApplication + store *iavl.MutableTree + snapshotManager *snapshots.Manager + logger log.Logger } func NewApplication(dataDir string) *Application { + // Set up IAVL store db, err := dbm.NewGoLevelDB("iavlstore", dataDir) if err != nil { panic(err) @@ -33,13 +41,27 @@ func NewApplication(dataDir string) *Application { if err != nil { panic(err) } + + // Set up snapshot storage + snapshotDir := filepath.Join(dataDir, "snapshots") + snapshotDB, err := dbm.NewGoLevelDB("metadata", snapshotDir) + if err != nil { + panic(err) + } + snapshotStore, err := snapshots.NewStore(snapshotDB, snapshotDir) + if err != nil { + panic(err) + } + return &Application{ - store: store, + store: store, + snapshotManager: snapshots.NewManager(snapshotStore, &snapshots.IAVLSnapshotter{MutableTree: store}), + logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)), } } -func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) { - return types.ResponseInfo{ +func (app *Application) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) { + return abci.ResponseInfo{ Data: fmt.Sprintf(`{"size":%v}`, app.store.Size()), Version: version.ABCIVersion, AppVersion: 1, @@ -48,43 +70,6 @@ func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) } } -func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { - _, _, err := parseTx(req.Tx) - if err != nil { - return types.ResponseCheckTx{ - Code: code.CodeTypeEncodingError, - Log: err.Error(), - } - } - return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1} -} - -func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeliverTx { - key, value, err := parseTx(req.Tx) - if err != nil { - panic(err) - } - app.store.Set(key, value) - return types.ResponseDeliverTx{Code: code.CodeTypeOK} -} - -func (app *Application) Commit() types.ResponseCommit { - hash, _, err := app.store.SaveVersion() - if err != nil { - panic(err) - } - return types.ResponseCommit{Data: hash} -} - -func (app *Application) Query(req types.RequestQuery) types.ResponseQuery { - _, value := app.store.Get(req.Data) - return types.ResponseQuery{ - Height: app.store.Version(), - Key: req.Data, - Value: value, - } -} - // parseTx parses a tx in 'key=value' format into a key and value. func parseTx(tx []byte) ([]byte, []byte, error) { parts := bytes.Split(tx, []byte("=")) @@ -96,3 +81,148 @@ func parseTx(tx []byte) ([]byte, []byte, error) { } return parts[0], parts[1], nil } + +func (app *Application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { + _, _, err := parseTx(req.Tx) + if err != nil { + return abci.ResponseCheckTx{ + Code: code.CodeTypeEncodingError, + Log: err.Error(), + } + } + return abci.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1} +} + +func (app *Application) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { + key, value, err := parseTx(req.Tx) + if err != nil { + panic(err) + } + app.store.Set(key, value) + return abci.ResponseDeliverTx{Code: code.CodeTypeOK} +} + +func (app *Application) Commit() abci.ResponseCommit { + hash, height, err := app.store.SaveVersion() + if err != nil { + panic(err) + } + + // Take asynchronous state sync snapshot every 10 blocks + if height%10 == 0 { + go func() { + app.logger.Info("Creating state snapshot", "height", height) + snapshot, err := app.snapshotManager.Create(uint64(height)) + if err != nil { + app.logger.Error("Failed to create state snapshot", "height", height, "err", err) + return + } + app.logger.Info("Completed state snapshot", "height", height, "format", snapshot.Format) + }() + } + + return abci.ResponseCommit{Data: hash} +} + +func (app *Application) Query(req abci.RequestQuery) abci.ResponseQuery { + _, value := app.store.Get(req.Data) + return abci.ResponseQuery{ + Height: app.store.Version(), + Key: req.Data, + Value: value, + } +} + +// ListSnapshots implements the ABCI interface. It delegates to app.snapshotManager if set. +func (app *Application) ListSnapshots(req abci.RequestListSnapshots) abci.ResponseListSnapshots { + resp := abci.ResponseListSnapshots{Snapshots: []*abci.Snapshot{}} + if app.snapshotManager == nil { + return resp + } + + snapshots, err := app.snapshotManager.List() + if err != nil { + app.logger.Error("Failed to list snapshots", "err", err) + return resp + } + for _, snapshot := range snapshots { + abciSnapshot, err := snapshot.ToABCI() + if err != nil { + app.logger.Error("Failed to list snapshots", "err", err) + return resp + } + resp.Snapshots = append(resp.Snapshots, &abciSnapshot) + } + + return resp +} + +// LoadSnapshotChunk implements the ABCI interface. It delegates to app.snapshotManager if set. +func (app *Application) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk { + if app.snapshotManager == nil { + return abci.ResponseLoadSnapshotChunk{} + } + chunk, err := app.snapshotManager.LoadChunk(req.Height, req.Format, req.Chunk) + if err != nil { + app.logger.Error("Failed to load snapshot chunk", "height", req.Height, "format", req.Format, + "chunk", req.Chunk, "err") + return abci.ResponseLoadSnapshotChunk{} + } + return abci.ResponseLoadSnapshotChunk{Chunk: chunk} +} + +// OfferSnapshot implements the ABCI interface. It delegates to app.snapshotManager if set. +func (app *Application) OfferSnapshot(req abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot { + if req.Snapshot == nil { + app.logger.Error("Received nil snapshot") + return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT} + } + + snapshot, err := snapshottypes.SnapshotFromABCI(req.Snapshot) + if err != nil { + app.logger.Error("Failed to decode snapshot metadata", "err", err) + return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT} + } + err = app.snapshotManager.Restore(snapshot) + switch { + case err == nil: + return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT} + + case errors.Is(err, snapshottypes.ErrUnknownFormat): + return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT} + + case errors.Is(err, snapshottypes.ErrInvalidMetadata): + app.logger.Error("Rejecting invalid snapshot", "height", req.Snapshot.Height, + "format", req.Snapshot.Format, "err", err) + return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT} + + default: + app.logger.Error("Failed to restore snapshot", "height", req.Snapshot.Height, + "format", req.Snapshot.Format, "err", err) + // We currently don't support resetting the IAVL stores and retrying a different snapshot, + // so we ask Tendermint to abort all snapshot restoration. + return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT} + } +} + +// ApplySnapshotChunk implements the ABCI interface. It delegates to app.snapshotManager if set. +func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk { + _, err := app.snapshotManager.RestoreChunk(req.Chunk) + switch { + case err == nil: + return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT} + + case errors.Is(err, snapshottypes.ErrChunkHashMismatch): + app.logger.Error("Chunk checksum mismatch, rejecting sender and requesting refetch", + "chunk", req.Index, "sender", req.Sender, "err", err) + return abci.ResponseApplySnapshotChunk{ + Result: abci.ResponseApplySnapshotChunk_RETRY, + RefetchChunks: []uint32{req.Index}, + RejectSenders: []string{req.Sender}, + } + + default: + app.logger.Error("Failed to restore snapshot", "err", err) + return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ABORT} + } +} diff --git a/abci/example/iavlstore/snapshots/helpers_test.go b/abci/example/iavlstore/snapshots/helpers_test.go new file mode 100644 index 000000000..1954232ac --- /dev/null +++ b/abci/example/iavlstore/snapshots/helpers_test.go @@ -0,0 +1,152 @@ +package snapshots_test + +import ( + "bytes" + "crypto/sha256" + "errors" + "io" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + db "github.com/tendermint/tm-db" + + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots" + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types" +) + +func checksum(b []byte) []byte { + hash := sha256.Sum256(b) + return hash[:] +} + +func checksums(slice [][]byte) [][]byte { + checksums := [][]byte{} + for _, chunk := range slice { + checksums = append(checksums, checksum(chunk)) + } + return checksums +} + +func hash(chunks [][]byte) []byte { + hasher := sha256.New() + for _, chunk := range chunks { + hasher.Write(chunk) + } + return hasher.Sum(nil) +} + +func makeChunks(chunks [][]byte) <-chan io.ReadCloser { + ch := make(chan io.ReadCloser, len(chunks)) + for _, chunk := range chunks { + ch <- ioutil.NopCloser(bytes.NewReader(chunk)) + } + close(ch) + return ch +} + +func readChunks(chunks <-chan io.ReadCloser) [][]byte { + bodies := [][]byte{} + for chunk := range chunks { + body, err := ioutil.ReadAll(chunk) + if err != nil { + panic(err) + } + bodies = append(bodies, body) + } + return bodies +} + +type mockSnapshotter struct { + chunks [][]byte +} + +func (m *mockSnapshotter) Restore( + height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{}, +) error { + if format == 0 { + return types.ErrUnknownFormat + } + if m.chunks != nil { + return errors.New("already has contents") + } + if ready != nil { + close(ready) + } + + m.chunks = [][]byte{} + for reader := range chunks { + chunk, err := ioutil.ReadAll(reader) + if err != nil { + return err + } + m.chunks = append(m.chunks, chunk) + } + + return nil +} + +func (m *mockSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) { + if format == 0 { + return nil, types.ErrUnknownFormat + } + ch := make(chan io.ReadCloser, len(m.chunks)) + for _, chunk := range m.chunks { + ch <- ioutil.NopCloser(bytes.NewReader(chunk)) + } + close(ch) + return ch, nil +} + +// setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1. +// The snapshot will complete when the returned closer is called. +func setupBusyManager(t *testing.T) (*snapshots.Manager, func()) { + tempdir, err := ioutil.TempDir("", "") + require.NoError(t, err) + store, err := snapshots.NewStore(db.NewMemDB(), tempdir) + require.NoError(t, err) + hung := newHungSnapshotter() + mgr := snapshots.NewManager(store, hung) + + go func() { + _, err := mgr.Create(1) + require.NoError(t, err) + }() + time.Sleep(10 * time.Millisecond) + + closer := func() { + hung.Close() + os.RemoveAll(tempdir) + } + return mgr, closer +} + +// hungSnapshotter can be used to test operations in progress. Call close to end the snapshot. +type hungSnapshotter struct { + ch chan struct{} +} + +func newHungSnapshotter() *hungSnapshotter { + return &hungSnapshotter{ + ch: make(chan struct{}), + } +} + +func (m *hungSnapshotter) Close() { + close(m.ch) +} + +func (m *hungSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) { + <-m.ch + ch := make(chan io.ReadCloser, 1) + ch <- ioutil.NopCloser(bytes.NewReader([]byte{})) + return ch, nil +} + +func (m *hungSnapshotter) Restore( + height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{}, +) error { + panic("not implemented") +} diff --git a/abci/example/iavlstore/snapshots/iavlsnapshotter.go b/abci/example/iavlstore/snapshots/iavlsnapshotter.go new file mode 100644 index 000000000..0cb199540 --- /dev/null +++ b/abci/example/iavlstore/snapshots/iavlsnapshotter.go @@ -0,0 +1,244 @@ +package snapshots + +import ( + "bufio" + "compress/zlib" + "errors" + "fmt" + "io" + "math" + "sort" + "strings" + + "github.com/cosmos/iavl" + snapshottypes "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types" + "github.com/tendermint/tendermint/libs/protoio" +) + +const ( + snapshotChunkSize = uint64(10e6) + snapshotBufferSize = int(snapshotChunkSize) + snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit +) + +// IAVLSnapshotter is a wrapper around an IAVL store that does snapshotting. +// It implements the Snapshotter interface. +type IAVLSnapshotter struct { + *iavl.MutableTree +} + +// Snapshot implements Snapshotter. +func (tree *IAVLSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) { + if format != snapshottypes.CurrentFormat { + return nil, fmt.Errorf("%w %v", snapshottypes.ErrUnknownFormat, format) + } + if height == 0 { + return nil, errors.New("cannot snapshot height 0") + } + if height > uint64(tree.Version()) { + return nil, fmt.Errorf("cannot snapshot future height %v", height) + } + + // Collect stores to snapshot (only IAVL stores are supported) + type namedStore struct { + *iavl.MutableTree + name string + } + stores := []namedStore{{ + MutableTree: tree.MutableTree, + name: "iavlstore", + }} + sort.Slice(stores, func(i, j int) bool { + return strings.Compare(stores[i].name, stores[j].name) == -1 + }) + + // Spawn goroutine to generate snapshot chunks and pass their io.ReadClosers through a channel + ch := make(chan io.ReadCloser) + go func() { + // Set up a stream pipeline to serialize snapshot nodes: + // ExportNode -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser + chunkWriter := NewChunkWriter(ch, snapshotChunkSize) + defer chunkWriter.Close() + bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize) + defer func() { + if err := bufWriter.Flush(); err != nil { + chunkWriter.CloseWithError(err) + } + }() + zWriter, err := zlib.NewWriterLevel(bufWriter, 7) + if err != nil { + chunkWriter.CloseWithError(fmt.Errorf("zlib error: %w", err)) + return + } + defer func() { + if err := zWriter.Close(); err != nil { + chunkWriter.CloseWithError(err) + } + }() + protoWriter := protoio.NewDelimitedWriter(zWriter) + defer func() { + if err := protoWriter.Close(); err != nil { + chunkWriter.CloseWithError(err) + } + }() + + // Export each IAVL store. Stores are serialized as a stream of SnapshotItem Protobuf + // messages. The first item contains a SnapshotStore with store metadata (i.e. name), + // and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes + // are demarcated by new SnapshotStore items. + for _, store := range stores { + tree, err := store.GetImmutable(int64(height)) + if err != nil { + chunkWriter.CloseWithError(err) + return + } + exporter := tree.Export() + defer exporter.Close() + _, err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_Store{ + Store: &snapshottypes.SnapshotStoreItem{ + Name: store.name, + }, + }, + }) + if err != nil { + chunkWriter.CloseWithError(err) + return + } + + for { + node, err := exporter.Next() + if err == iavl.ExportDone { + break + } else if err != nil { + chunkWriter.CloseWithError(err) + return + } + _, err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_IAVL{ + IAVL: &snapshottypes.SnapshotIAVLItem{ + Key: node.Key, + Value: node.Value, + Height: int32(node.Height), + Version: node.Version, + }, + }, + }) + if err != nil { + chunkWriter.CloseWithError(err) + return + } + } + exporter.Close() + } + }() + + return ch, nil +} + +// Restore implements snapshottypes.Snapshotter. +func (tree *IAVLSnapshotter) Restore( + height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{}, +) error { + if format != snapshottypes.CurrentFormat { + return fmt.Errorf("%w %v", snapshottypes.ErrUnknownFormat, format) + } + if height == 0 { + return fmt.Errorf("%w: cannot restore snapshot at height 0", snapshottypes.ErrInvalidMetadata) + } + if height > math.MaxInt64 { + return fmt.Errorf("%w: snapshot height %v cannot exceed %v", snapshottypes.ErrInvalidMetadata, + height, math.MaxInt64) + } + + // Signal readiness. Must be done before the readers below are set up, since the zlib + // reader reads from the stream on initialization, potentially causing deadlocks. + if ready != nil { + close(ready) + } + + // Set up a restore stream pipeline + // chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode + chunkReader := NewChunkReader(chunks) + defer chunkReader.Close() + zReader, err := zlib.NewReader(chunkReader) + if err != nil { + return fmt.Errorf("zlib error: %w", err) + } + defer zReader.Close() + protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize) + defer protoReader.Close() + + // Import nodes into stores. The first item is expected to be a SnapshotItem containing + // a SnapshotStoreItem, telling us which store to import into. The following items will contain + // SnapshotNodeItem (i.e. ExportNode) until we reach the next SnapshotStoreItem or EOF. + var importer *iavl.Importer + for { + item := &snapshottypes.SnapshotItem{} + err := protoReader.ReadMsg(item) + if err == io.EOF { + break + } else if err != nil { + return fmt.Errorf("invalid protobuf message: %w", err) + } + + switch item := item.Item.(type) { + case *snapshottypes.SnapshotItem_Store: + if importer != nil { + err = importer.Commit() + if err != nil { + return fmt.Errorf("IAVL commit failed: %w", err) + } + importer.Close() + } + if item.Store.Name != "iavlstore" { + return fmt.Errorf("cannot import into non-IAVL store %q", item.Store.Name) + } + importer, err = tree.Import(int64(height)) + if err != nil { + return fmt.Errorf("import failed: %w", err) + } + defer importer.Close() + + case *snapshottypes.SnapshotItem_IAVL: + if importer == nil { + return fmt.Errorf("received IAVL node item before store item") + } + if item.IAVL.Height > math.MaxInt8 { + return fmt.Errorf("node height %v cannot exceed %v", item.IAVL.Height, math.MaxInt8) + } + node := &iavl.ExportNode{ + Key: item.IAVL.Key, + Value: item.IAVL.Value, + Height: int8(item.IAVL.Height), + Version: item.IAVL.Version, + } + // Protobuf does not differentiate between []byte{} as nil, but fortunately IAVL does + // not allow nil keys nor nil values for leaf nodes, so we can always set them to empty. + if node.Key == nil { + node.Key = []byte{} + } + if node.Height == 0 && node.Value == nil { + node.Value = []byte{} + } + err := importer.Add(node) + if err != nil { + return fmt.Errorf("IAVL node import failed: %w", err) + } + + default: + return fmt.Errorf("unknown snapshot item %T", item) + } + } + + if importer != nil { + err := importer.Commit() + if err != nil { + return fmt.Errorf("IAVL commit failed: %w", err) + } + importer.Close() + } + + _, err = tree.Load() + return err +} diff --git a/abci/example/iavlstore/snapshots/manager.go b/abci/example/iavlstore/snapshots/manager.go new file mode 100644 index 000000000..1df81b74f --- /dev/null +++ b/abci/example/iavlstore/snapshots/manager.go @@ -0,0 +1,260 @@ +package snapshots + +import ( + "bytes" + "crypto/sha256" + "errors" + "fmt" + "io" + "io/ioutil" + "sync" + + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types" +) + +const ( + opNone operation = "" + opSnapshot operation = "snapshot" + opPrune operation = "prune" + opRestore operation = "restore" + + chunkBufferSize = 4 +) + +// operation represents a Manager operation. Only one operation can be in progress at a time. +type operation string + +// restoreDone represents the result of a restore operation. +type restoreDone struct { + complete bool // if true, restore completed successfully (not prematurely) + err error // if non-nil, restore errored +} + +// Manager manages snapshot and restore operations for an app, making sure only a single +// long-running operation is in progress at any given time, and provides convenience methods +// mirroring the ABCI interface. +// +// Although the ABCI interface (and this manager) passes chunks as byte slices, the internal +// snapshot/restore APIs use IO streams (i.e. chan io.ReadCloser), for two reasons: +// +// 1) In the future, ABCI should support streaming. Consider e.g. InitChain during chain +// upgrades, which currently passes the entire chain state as an in-memory byte slice. +// https://github.com/tendermint/tendermint/issues/5184 +// +// 2) io.ReadCloser streams automatically propagate IO errors, and can pass arbitrary +// errors via io.Pipe.CloseWithError(). +type Manager struct { + store *Store + target types.Snapshotter + + mtx sync.Mutex + operation operation + chRestore chan<- io.ReadCloser + chRestoreDone <-chan restoreDone + restoreChunkHashes [][]byte + restoreChunkIndex uint32 +} + +// NewManager creates a new manager. +func NewManager(store *Store, target types.Snapshotter) *Manager { + return &Manager{ + store: store, + target: target, + } +} + +// begin starts an operation, or errors if one is in progress. It manages the mutex itself. +func (m *Manager) begin(op operation) error { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.beginLocked(op) +} + +// beginLocked begins an operation while already holding the mutex. +func (m *Manager) beginLocked(op operation) error { + if op == opNone { + return errors.New("can't begin a none operation") + } + if m.operation != opNone { + return fmt.Errorf("a %v operation is in progress", m.operation) + } + m.operation = op + return nil +} + +// end ends the current operation. +func (m *Manager) end() { + m.mtx.Lock() + defer m.mtx.Unlock() + m.endLocked() +} + +// endLocked ends the current operation while already holding the mutex. +func (m *Manager) endLocked() { + m.operation = opNone + if m.chRestore != nil { + close(m.chRestore) + m.chRestore = nil + } + m.chRestoreDone = nil + m.restoreChunkHashes = nil + m.restoreChunkIndex = 0 +} + +// Create creates a snapshot and returns its metadata. +func (m *Manager) Create(height uint64) (*types.Snapshot, error) { + if m == nil { + return nil, errors.New("no snapshot store configured") + } + err := m.begin(opSnapshot) + if err != nil { + return nil, err + } + defer m.end() + + latest, err := m.store.GetLatest() + if err != nil { + return nil, fmt.Errorf("failed to examine latest snapshot: %w", err) + } + if latest != nil && latest.Height >= height { + return nil, fmt.Errorf("a more recent snapshot already exists at height %v", latest.Height) + } + + chunks, err := m.target.Snapshot(height, types.CurrentFormat) + if err != nil { + return nil, err + } + return m.store.Save(height, types.CurrentFormat, chunks) +} + +// List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations. +func (m *Manager) List() ([]*types.Snapshot, error) { + return m.store.List() +} + +// LoadChunk loads a chunk into a byte slice, mirroring ABCI LoadChunk. It can be called +// concurrently with other operations. If the chunk does not exist, nil is returned. +func (m *Manager) LoadChunk(height uint64, format uint32, chunk uint32) ([]byte, error) { + reader, err := m.store.LoadChunk(height, format, chunk) + if err != nil { + return nil, err + } + if reader == nil { + return nil, nil + } + defer reader.Close() + + return ioutil.ReadAll(reader) +} + +// Prune prunes snapshots, if no other operations are in progress. +func (m *Manager) Prune(retain uint32) (uint64, error) { + err := m.begin(opPrune) + if err != nil { + return 0, err + } + defer m.end() + return m.store.Prune(retain) +} + +// Restore begins an async snapshot restoration, mirroring ABCI OfferSnapshot. Chunks must be fed +// via RestoreChunk() until the restore is complete or a chunk fails. +func (m *Manager) Restore(snapshot types.Snapshot) error { + if snapshot.Chunks == 0 { + return fmt.Errorf("%w: no chunks", types.ErrInvalidMetadata) + } + if uint32(len(snapshot.Metadata.ChunkHashes)) != snapshot.Chunks { + return fmt.Errorf("%w: snapshot has %v chunk hashes, but %v chunks", + types.ErrInvalidMetadata, + uint32(len(snapshot.Metadata.ChunkHashes)), + snapshot.Chunks) + } + m.mtx.Lock() + defer m.mtx.Unlock() + err := m.beginLocked(opRestore) + if err != nil { + return err + } + + // Start an asynchronous snapshot restoration, passing chunks and completion status via channels. + chChunks := make(chan io.ReadCloser, chunkBufferSize) + chReady := make(chan struct{}, 1) + chDone := make(chan restoreDone, 1) + go func() { + err := m.target.Restore(snapshot.Height, snapshot.Format, chChunks, chReady) + chDone <- restoreDone{ + complete: err == nil, + err: err, + } + close(chDone) + }() + + // Check for any initial errors from the restore, before any chunks are fed. + select { + case done := <-chDone: + m.endLocked() + if done.err != nil { + return done.err + } + return errors.New("restore ended unexpectedly") + case <-chReady: + } + + m.chRestore = chChunks + m.chRestoreDone = chDone + m.restoreChunkHashes = snapshot.Metadata.ChunkHashes + m.restoreChunkIndex = 0 + return nil +} + +// RestoreChunk adds a chunk to an active snapshot restoration, mirroring ABCI ApplySnapshotChunk. +// Chunks must be given until the restore is complete, returning true, or a chunk errors. +func (m *Manager) RestoreChunk(chunk []byte) (bool, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + if m.operation != opRestore { + return false, fmt.Errorf("no restore operation in progress") + } + + if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { + return false, errors.New("received unexpected chunk") + } + + // Check if any errors have occurred yet. + select { + case done := <-m.chRestoreDone: + m.endLocked() + if done.err != nil { + return false, done.err + } + return false, errors.New("restore ended unexpectedly") + default: + } + + // Verify the chunk hash. + hash := sha256.Sum256(chunk) + expected := m.restoreChunkHashes[m.restoreChunkIndex] + if !bytes.Equal(hash[:], expected) { + return false, fmt.Errorf("%w (expected %x, got %x)", + types.ErrChunkHashMismatch, hash, expected) + } + + // Pass the chunk to the restore, and wait for completion if it was the final one. + m.chRestore <- ioutil.NopCloser(bytes.NewReader(chunk)) + m.restoreChunkIndex++ + + if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) { + close(m.chRestore) + m.chRestore = nil + done := <-m.chRestoreDone + m.endLocked() + if done.err != nil { + return false, done.err + } + if !done.complete { + return false, errors.New("restore ended prematurely") + } + return true, nil + } + return false, nil +} diff --git a/abci/example/iavlstore/snapshots/manager_test.go b/abci/example/iavlstore/snapshots/manager_test.go new file mode 100644 index 000000000..727ff9150 --- /dev/null +++ b/abci/example/iavlstore/snapshots/manager_test.go @@ -0,0 +1,221 @@ +package snapshots_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots" + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types" +) + +func TestManager_List(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + manager := snapshots.NewManager(store, nil) + + mgrList, err := manager.List() + require.NoError(t, err) + storeList, err := store.List() + require.NoError(t, err) + + require.NotEmpty(t, storeList) + assert.Equal(t, storeList, mgrList) + + // list should not block or error on busy managers + manager, teardown = setupBusyManager(t) + defer teardown() + list, err := manager.List() + require.NoError(t, err) + assert.Equal(t, []*types.Snapshot{}, list) +} + +func TestManager_LoadChunk(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + manager := snapshots.NewManager(store, nil) + + // Existing chunk should return body + chunk, err := manager.LoadChunk(2, 1, 1) + require.NoError(t, err) + assert.Equal(t, []byte{2, 1, 1}, chunk) + + // Missing chunk should return nil + chunk, err = manager.LoadChunk(2, 1, 9) + require.NoError(t, err) + assert.Nil(t, chunk) + + // LoadChunk should not block or error on busy managers + manager, teardown = setupBusyManager(t) + defer teardown() + chunk, err = manager.LoadChunk(2, 1, 0) + require.NoError(t, err) + assert.Nil(t, chunk) +} + +func TestManager_Take(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + snapshotter := &mockSnapshotter{ + chunks: [][]byte{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + }, + } + manager := snapshots.NewManager(store, snapshotter) + + // nil manager should return error + _, err := (*snapshots.Manager)(nil).Create(1) + require.Error(t, err) + + // creating a snapshot at a lower height than the latest should error + _, err = manager.Create(3) + require.Error(t, err) + + // creating a snapshot at a higher height should be fine, and should return it + snapshot, err := manager.Create(5) + require.NoError(t, err) + assert.Equal(t, &types.Snapshot{ + Height: 5, + Format: types.CurrentFormat, + Chunks: 3, + Hash: []uint8{0x47, 0xe4, 0xee, 0x7f, 0x21, 0x1f, 0x73, 0x26, 0x5d, 0xd1, 0x76, 0x58, 0xf6, 0xe2, 0x1c, 0x13, 0x18, 0xbd, 0x6c, 0x81, 0xf3, 0x75, 0x98, 0xe2, 0xa, 0x27, 0x56, 0x29, 0x95, 0x42, 0xef, 0xcf}, + Metadata: types.Metadata{ + ChunkHashes: [][]byte{ + checksum([]byte{1, 2, 3}), + checksum([]byte{4, 5, 6}), + checksum([]byte{7, 8, 9}), + }, + }, + }, snapshot) + + storeSnapshot, chunks, err := store.Load(snapshot.Height, snapshot.Format) + require.NoError(t, err) + assert.Equal(t, snapshot, storeSnapshot) + assert.Equal(t, [][]byte{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, readChunks(chunks)) + + // creating a snapshot while a different snapshot is being created should error + manager, teardown = setupBusyManager(t) + defer teardown() + _, err = manager.Create(9) + require.Error(t, err) +} + +func TestManager_Prune(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + manager := snapshots.NewManager(store, nil) + + pruned, err := manager.Prune(2) + require.NoError(t, err) + assert.EqualValues(t, 1, pruned) + + list, err := manager.List() + require.NoError(t, err) + assert.Len(t, list, 3) + + // Prune should error while a snapshot is being taken + manager, teardown = setupBusyManager(t) + defer teardown() + _, err = manager.Prune(2) + require.Error(t, err) +} + +func TestManager_Restore(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + target := &mockSnapshotter{} + manager := snapshots.NewManager(store, target) + + chunks := [][]byte{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + } + + // Restore errors on invalid format + err := manager.Restore(types.Snapshot{ + Height: 3, + Format: 0, + Hash: []byte{1, 2, 3}, + Chunks: uint32(len(chunks)), + Metadata: types.Metadata{ChunkHashes: checksums(chunks)}, + }) + require.Error(t, err) + require.Equal(t, types.ErrUnknownFormat, err) + + // Restore errors on no chunks + err = manager.Restore(types.Snapshot{Height: 3, Format: 1, Hash: []byte{1, 2, 3}}) + require.Error(t, err) + + // Restore errors on chunk and chunkhashes mismatch + err = manager.Restore(types.Snapshot{ + Height: 3, + Format: 1, + Hash: []byte{1, 2, 3}, + Chunks: 4, + Metadata: types.Metadata{ChunkHashes: checksums(chunks)}, + }) + require.Error(t, err) + + // Starting a restore works + err = manager.Restore(types.Snapshot{ + Height: 3, + Format: 1, + Hash: []byte{1, 2, 3}, + Chunks: 3, + Metadata: types.Metadata{ChunkHashes: checksums(chunks)}, + }) + require.NoError(t, err) + + // While the restore is in progress, any other operations fail + _, err = manager.Create(4) + require.Error(t, err) + + _, err = manager.Prune(1) + require.Error(t, err) + + // Feeding an invalid chunk should error due to invalid checksum, but not abort restoration. + _, err = manager.RestoreChunk([]byte{9, 9, 9}) + require.Error(t, err) + require.True(t, errors.Is(err, types.ErrChunkHashMismatch)) + + // Feeding the chunks should work + for i, chunk := range chunks { + done, err := manager.RestoreChunk(chunk) + require.NoError(t, err) + if i == len(chunks)-1 { + assert.True(t, done) + } else { + assert.False(t, done) + } + } + + assert.Equal(t, chunks, target.chunks) + + // Starting a new restore should fail now, because the target already has contents. + err = manager.Restore(types.Snapshot{ + Height: 3, + Format: 1, + Hash: []byte{1, 2, 3}, + Chunks: 3, + Metadata: types.Metadata{ChunkHashes: checksums(chunks)}, + }) + require.Error(t, err) + + // But if we clear out the target we should be able to start a new restore. This time we'll + // fail it with a checksum error. That error should stop the operation, so that we can do + // a prune operation right after. + target.chunks = nil + err = manager.Restore(types.Snapshot{ + Height: 3, + Format: 1, + Hash: []byte{1, 2, 3}, + Chunks: 3, + Metadata: types.Metadata{ChunkHashes: checksums(chunks)}, + }) + require.NoError(t, err) +} diff --git a/abci/example/iavlstore/snapshots/store.go b/abci/example/iavlstore/snapshots/store.go new file mode 100644 index 000000000..f545be796 --- /dev/null +++ b/abci/example/iavlstore/snapshots/store.go @@ -0,0 +1,366 @@ +package snapshots + +import ( + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "io" + "math" + "os" + "path/filepath" + "strconv" + "sync" + + "github.com/gogo/protobuf/proto" + db "github.com/tendermint/tm-db" + + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types" +) + +const ( + // keyPrefixSnapshot is the prefix for snapshot database keys + keyPrefixSnapshot byte = 0x01 +) + +// Store is a snapshot store, containing snapshot metadata and binary chunks. +type Store struct { + db db.DB + dir string + + mtx sync.Mutex + saving map[uint64]bool // heights currently being saved +} + +// NewStore creates a new snapshot store. +func NewStore(db db.DB, dir string) (*Store, error) { + if dir == "" { + return nil, errors.New("snapshot directory not given") + } + err := os.MkdirAll(dir, 0755) + if err != nil { + return nil, fmt.Errorf("failed to create snapshot directory %q: %w", dir, err) + } + + return &Store{ + db: db, + dir: dir, + saving: make(map[uint64]bool), + }, nil +} + +// Delete deletes a snapshot. +func (s *Store) Delete(height uint64, format uint32) error { + s.mtx.Lock() + saving := s.saving[height] + s.mtx.Unlock() + if saving { + return fmt.Errorf("snapshot for height %v format %v is currently being saved", height, format) + } + err := s.db.DeleteSync(encodeKey(height, format)) + if err != nil { + return fmt.Errorf("failed to delete snapshot for height %v format %v: %w", + height, format, err) + } + err = os.RemoveAll(s.pathSnapshot(height, format)) + if err != nil { + return fmt.Errorf("failed to delete snapshot chunks for height %v format %v: %w", + height, format, err) + } + return nil +} + +// Get fetches snapshot info from the database. +func (s *Store) Get(height uint64, format uint32) (*types.Snapshot, error) { + bytes, err := s.db.Get(encodeKey(height, format)) + if err != nil { + return nil, fmt.Errorf("failed to fetch snapshot metadata for height %v format %v: %w", + height, format, err) + } + if bytes == nil { + return nil, nil + } + snapshot := &types.Snapshot{} + err = proto.Unmarshal(bytes, snapshot) + if err != nil { + return nil, fmt.Errorf("failed to decode snapshot metadata for height %v format %v: %w", + height, format, err) + } + if snapshot.Metadata.ChunkHashes == nil { + snapshot.Metadata.ChunkHashes = [][]byte{} + } + return snapshot, nil +} + +// Get fetches the latest snapshot from the database, if any. +func (s *Store) GetLatest() (*types.Snapshot, error) { + iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(math.MaxUint64, math.MaxUint32)) + if err != nil { + return nil, fmt.Errorf("failed to find latest snapshot: %w", err) + } + defer iter.Close() + + var snapshot *types.Snapshot + if iter.Valid() { + snapshot = &types.Snapshot{} + err := proto.Unmarshal(iter.Value(), snapshot) + if err != nil { + return nil, fmt.Errorf("failed to decode latest snapshot: %w", err) + } + } + err = iter.Error() + if err != nil { + return nil, fmt.Errorf("failed to find latest snapshot: %w", err) + } + return snapshot, nil +} + +// List lists snapshots, in reverse order (newest first). +func (s *Store) List() ([]*types.Snapshot, error) { + iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(math.MaxUint64, math.MaxUint32)) + if err != nil { + return nil, fmt.Errorf("failed to list snapshots: %w", err) + } + defer iter.Close() + + snapshots := make([]*types.Snapshot, 0) + for ; iter.Valid(); iter.Next() { + snapshot := &types.Snapshot{} + err := proto.Unmarshal(iter.Value(), snapshot) + if err != nil { + return nil, fmt.Errorf("failed to decode snapshot info: %w", err) + } + snapshots = append(snapshots, snapshot) + } + err = iter.Error() + if err != nil { + return nil, err + } + return snapshots, nil +} + +// Load loads a snapshot (both metadata and binary chunks). The chunks must be consumed and closed. +// Returns nil if the snapshot does not exist. +func (s *Store) Load(height uint64, format uint32) (*types.Snapshot, <-chan io.ReadCloser, error) { + snapshot, err := s.Get(height, format) + if err != nil { + return nil, nil, err + } + if snapshot == nil { + return nil, nil, nil + } + + ch := make(chan io.ReadCloser) + go func() { + defer close(ch) + for i := uint32(0); i < snapshot.Chunks; i++ { + pr, pw := io.Pipe() + ch <- pr + chunk, err := s.loadChunkFile(height, format, i) + if err != nil { + pw.CloseWithError(err) + return + } + defer chunk.Close() + _, err = io.Copy(pw, chunk) + if err != nil { + pw.CloseWithError(err) + return + } + chunk.Close() + pw.Close() + } + }() + + return snapshot, ch, nil +} + +// LoadChunk loads a chunk from disk, or returns nil if it does not exist. The caller must call +// Close() on it when done. +func (s *Store) LoadChunk(height uint64, format uint32, chunk uint32) (io.ReadCloser, error) { + path := s.pathChunk(height, format, chunk) + file, err := os.Open(path) + if os.IsNotExist(err) { + return nil, nil + } + return file, err +} + +// loadChunkFile loads a chunk from disk, and errors if it does not exist. +func (s *Store) loadChunkFile(height uint64, format uint32, chunk uint32) (io.ReadCloser, error) { + path := s.pathChunk(height, format, chunk) + file, err := os.Open(path) + if err != nil { + return nil, err + } + return file, nil +} + +// Prune removes old snapshots. The given number of most recent heights (regardless of format) are retained. +func (s *Store) Prune(retain uint32) (uint64, error) { + iter, err := s.db.ReverseIterator(encodeKey(0, 0), encodeKey(math.MaxUint64, math.MaxUint32)) + if err != nil { + return 0, fmt.Errorf("failed to prune snapshots: %w", err) + } + defer iter.Close() + + pruned := uint64(0) + prunedHeights := make(map[uint64]bool) + skip := make(map[uint64]bool) + for ; iter.Valid(); iter.Next() { + height, format, err := decodeKey(iter.Key()) + if err != nil { + return 0, fmt.Errorf("failed to prune snapshots: %w", err) + } + if skip[height] || uint32(len(skip)) < retain { + skip[height] = true + continue + } + err = s.Delete(height, format) + if err != nil { + return 0, fmt.Errorf("failed to prune snapshots: %w", err) + } + pruned++ + prunedHeights[height] = true + } + // Since Delete() deletes a specific format, while we want to prune a height, we clean up + // the height directory as well + for height, ok := range prunedHeights { + if ok { + err = os.Remove(s.pathHeight(height)) + if err != nil { + return 0, fmt.Errorf("failed to remove snapshot directory for height %v", height) + } + } + } + err = iter.Error() + if err != nil { + return 0, err + } + return pruned, nil +} + +// Save saves a snapshot to disk, returning it. +func (s *Store) Save( + height uint64, format uint32, chunks <-chan io.ReadCloser, +) (*types.Snapshot, error) { + defer DrainChunks(chunks) + if height == 0 { + return nil, errors.New("snapshot height cannot be 0") + } + + s.mtx.Lock() + saving := s.saving[height] + s.saving[height] = true + s.mtx.Unlock() + if saving { + return nil, fmt.Errorf("a snapshot for height %v is already being saved", height) + } + defer func() { + s.mtx.Lock() + delete(s.saving, height) + s.mtx.Unlock() + }() + + exists, err := s.db.Has(encodeKey(height, format)) + if err != nil { + return nil, err + } + if exists { + return nil, fmt.Errorf("snapshot already exists for height %v format %v", height, format) + } + + snapshot := &types.Snapshot{ + Height: height, + Format: format, + } + index := uint32(0) + snapshotHasher := sha256.New() + chunkHasher := sha256.New() + for chunkBody := range chunks { + defer chunkBody.Close() // nolint: staticcheck + dir := s.pathSnapshot(height, format) + err = os.MkdirAll(dir, 0755) + if err != nil { + return nil, fmt.Errorf("failed to create snapshot directory %q: %w", dir, err) + } + path := s.pathChunk(height, format, index) + file, err := os.Create(path) + if err != nil { + return nil, fmt.Errorf("failed to create snapshot chunk file %q: %w", path, err) + } + defer file.Close() // nolint: staticcheck + chunkHasher.Reset() + _, err = io.Copy(io.MultiWriter(file, chunkHasher, snapshotHasher), chunkBody) + if err != nil { + return nil, fmt.Errorf("failed to generate snapshot chunk %v: %w", index, err) + } + err = file.Close() + if err != nil { + return nil, fmt.Errorf("failed to close snapshot chunk %v: %w", index, err) + } + err = chunkBody.Close() + if err != nil { + return nil, fmt.Errorf("failed to close snapshot chunk %v: %w", index, err) + } + snapshot.Metadata.ChunkHashes = append(snapshot.Metadata.ChunkHashes, chunkHasher.Sum(nil)) + index++ + } + snapshot.Chunks = index + snapshot.Hash = snapshotHasher.Sum(nil) + err = s.saveSnapshot(snapshot) + if err != nil { + return nil, err + } + return snapshot, nil +} + +// saveSnapshot saves snapshot metadata to the database. +func (s *Store) saveSnapshot(snapshot *types.Snapshot) error { + value, err := proto.Marshal(snapshot) + if err != nil { + return fmt.Errorf("failed to encode snapshot metadata: %w", err) + } + err = s.db.SetSync(encodeKey(snapshot.Height, snapshot.Format), value) + if err != nil { + return fmt.Errorf("failed to store snapshot: %w", err) + } + return nil +} + +// pathHeight generates the path to a height, containing multiple snapshot formats. +func (s *Store) pathHeight(height uint64) string { + return filepath.Join(s.dir, strconv.FormatUint(height, 10)) +} + +// pathSnapshot generates a snapshot path, as a specific format under a height. +func (s *Store) pathSnapshot(height uint64, format uint32) string { + return filepath.Join(s.pathHeight(height), strconv.FormatUint(uint64(format), 10)) +} + +// pathChunk generates a snapshot chunk path. +func (s *Store) pathChunk(height uint64, format uint32, chunk uint32) string { + return filepath.Join(s.pathSnapshot(height, format), strconv.FormatUint(uint64(chunk), 10)) +} + +// decodeKey decodes a snapshot key. +func decodeKey(k []byte) (uint64, uint32, error) { + if len(k) != 13 { + return 0, 0, fmt.Errorf("invalid snapshot key with length %v", len(k)) + } + if k[0] != keyPrefixSnapshot { + return 0, 0, fmt.Errorf("invalid snapshot key prefix %x", k[0]) + } + height := binary.BigEndian.Uint64(k[1:9]) + format := binary.BigEndian.Uint32(k[9:13]) + return height, format, nil +} + +// encodeKey encodes a snapshot key. +func encodeKey(height uint64, format uint32) []byte { + k := make([]byte, 13) + k[0] = keyPrefixSnapshot + binary.BigEndian.PutUint64(k[1:], height) + binary.BigEndian.PutUint32(k[9:], format) + return k +} diff --git a/abci/example/iavlstore/snapshots/store_test.go b/abci/example/iavlstore/snapshots/store_test.go new file mode 100644 index 000000000..80d46d445 --- /dev/null +++ b/abci/example/iavlstore/snapshots/store_test.go @@ -0,0 +1,366 @@ +package snapshots_test + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + db "github.com/tendermint/tm-db" + + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots" + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots/types" +) + +func setupStore(t *testing.T) (*snapshots.Store, func()) { + tempdir, err := ioutil.TempDir("", "snapshots") + require.NoError(t, err) + + store, err := snapshots.NewStore(db.NewMemDB(), tempdir) + require.NoError(t, err) + + _, err = store.Save(1, 1, makeChunks([][]byte{ + {1, 1, 0}, {1, 1, 1}, + })) + require.NoError(t, err) + _, err = store.Save(2, 1, makeChunks([][]byte{ + {2, 1, 0}, {2, 1, 1}, + })) + require.NoError(t, err) + _, err = store.Save(2, 2, makeChunks([][]byte{ + {2, 2, 0}, {2, 2, 1}, {2, 2, 2}, + })) + require.NoError(t, err) + _, err = store.Save(3, 2, makeChunks([][]byte{ + {3, 2, 0}, {3, 2, 1}, {3, 2, 2}, + })) + require.NoError(t, err) + + teardown := func() { + err := os.RemoveAll(tempdir) + if err != nil { + t.Logf("Failed to remove tempdir %q: %v", tempdir, err) + } + } + return store, teardown +} + +func TestNewStore(t *testing.T) { + tempdir, err := ioutil.TempDir("", "snapshots") + require.NoError(t, err) + defer os.RemoveAll(tempdir) + + _, err = snapshots.NewStore(db.NewMemDB(), tempdir) + require.NoError(t, err) +} + +func TestNewStore_ErrNoDir(t *testing.T) { + _, err := snapshots.NewStore(db.NewMemDB(), "") + require.Error(t, err) +} + +func TestNewStore_ErrDirFailure(t *testing.T) { + tempfile, err := ioutil.TempFile("", "snapshots") + require.NoError(t, err) + defer func() { + os.RemoveAll(tempfile.Name()) + tempfile.Close() + }() + tempdir := filepath.Join(tempfile.Name(), "subdir") + + _, err = snapshots.NewStore(db.NewMemDB(), tempdir) + require.Error(t, err) +} + +func TestStore_Delete(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + + // Deleting a snapshot should remove it + err := store.Delete(2, 2) + require.NoError(t, err) + + snapshot, err := store.Get(2, 2) + require.NoError(t, err) + assert.Nil(t, snapshot) + + snapshots, err := store.List() + require.NoError(t, err) + assert.Len(t, snapshots, 3) + + // Deleting it again should not error + err = store.Delete(2, 2) + require.NoError(t, err) + + // Deleting a snapshot being saved should error + ch := make(chan io.ReadCloser) + go store.Save(9, 1, ch) + + time.Sleep(10 * time.Millisecond) + err = store.Delete(9, 1) + require.Error(t, err) + + // But after it's saved it should work + close(ch) + time.Sleep(10 * time.Millisecond) + err = store.Delete(9, 1) + require.NoError(t, err) +} + +func TestStore_Get(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + + // Loading a missing snapshot should return nil + snapshot, err := store.Get(9, 9) + require.NoError(t, err) + assert.Nil(t, snapshot) + + // Loading a snapshot should returns its metadata + snapshot, err = store.Get(2, 1) + require.NoError(t, err) + assert.Equal(t, &types.Snapshot{ + Height: 2, + Format: 1, + Chunks: 2, + Hash: hash([][]byte{{2, 1, 0}, {2, 1, 1}}), + Metadata: types.Metadata{ + ChunkHashes: [][]byte{ + checksum([]byte{2, 1, 0}), + checksum([]byte{2, 1, 1}), + }, + }, + }, snapshot) +} + +func TestStore_GetLatest(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + + // Loading a missing snapshot should return nil + snapshot, err := store.GetLatest() + require.NoError(t, err) + assert.Equal(t, &types.Snapshot{ + Height: 3, + Format: 2, + Chunks: 3, + Hash: hash([][]byte{ + {3, 2, 0}, + {3, 2, 1}, + {3, 2, 2}, + }), + Metadata: types.Metadata{ + ChunkHashes: checksums([][]byte{ + {3, 2, 0}, + {3, 2, 1}, + {3, 2, 2}, + }), + }, + }, snapshot) +} + +func TestStore_List(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + + snapshots, err := store.List() + require.NoError(t, err) + + require.Equal(t, []*types.Snapshot{ + {Height: 3, Format: 2, Chunks: 3, Hash: hash([][]byte{{3, 2, 0}, {3, 2, 1}, {3, 2, 2}}), + Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{3, 2, 0}, {3, 2, 1}, {3, 2, 2}})}, + }, + {Height: 2, Format: 2, Chunks: 3, Hash: hash([][]byte{{2, 2, 0}, {2, 2, 1}, {2, 2, 2}}), + Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{2, 2, 0}, {2, 2, 1}, {2, 2, 2}})}, + }, + {Height: 2, Format: 1, Chunks: 2, Hash: hash([][]byte{{2, 1, 0}, {2, 1, 1}}), + Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{2, 1, 0}, {2, 1, 1}})}, + }, + {Height: 1, Format: 1, Chunks: 2, Hash: hash([][]byte{{1, 1, 0}, {1, 1, 1}}), + Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{1, 1, 0}, {1, 1, 1}})}, + }, + }, snapshots) +} + +func TestStore_Load(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + + // Loading a missing snapshot should return nil + snapshot, chunks, err := store.Load(9, 9) + require.NoError(t, err) + assert.Nil(t, snapshot) + assert.Nil(t, chunks) + + // Loading a snapshot should returns its metadata and chunks + snapshot, chunks, err = store.Load(2, 1) + require.NoError(t, err) + assert.Equal(t, &types.Snapshot{ + Height: 2, + Format: 1, + Chunks: 2, + Hash: hash([][]byte{{2, 1, 0}, {2, 1, 1}}), + Metadata: types.Metadata{ + ChunkHashes: [][]byte{ + checksum([]byte{2, 1, 0}), + checksum([]byte{2, 1, 1}), + }, + }, + }, snapshot) + + for i := uint32(0); i < snapshot.Chunks; i++ { + reader, ok := <-chunks + require.True(t, ok) + chunk, err := ioutil.ReadAll(reader) + require.NoError(t, err) + err = reader.Close() + require.NoError(t, err) + assert.Equal(t, []byte{2, 1, byte(i)}, chunk) + } + assert.Empty(t, chunks) +} + +func TestStore_LoadChunk(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + + // Loading a missing snapshot should return nil + chunk, err := store.LoadChunk(9, 9, 0) + require.NoError(t, err) + assert.Nil(t, chunk) + + // Loading a missing chunk index should return nil + chunk, err = store.LoadChunk(2, 1, 2) + require.NoError(t, err) + require.Nil(t, chunk) + + // Loading a chunk should returns a content reader + chunk, err = store.LoadChunk(2, 1, 0) + require.NoError(t, err) + require.NotNil(t, chunk) + body, err := ioutil.ReadAll(chunk) + require.NoError(t, err) + assert.Equal(t, []byte{2, 1, 0}, body) + err = chunk.Close() + require.NoError(t, err) +} + +func TestStore_Prune(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + + // Pruning too many snapshots should be fine + pruned, err := store.Prune(4) + require.NoError(t, err) + assert.EqualValues(t, 0, pruned) + + snapshots, err := store.List() + require.NoError(t, err) + assert.Len(t, snapshots, 4) + + // Pruning until the last two heights should leave three snapshots (for two heights) + pruned, err = store.Prune(2) + require.NoError(t, err) + assert.EqualValues(t, 1, pruned) + + snapshots, err = store.List() + require.NoError(t, err) + require.Equal(t, []*types.Snapshot{ + {Height: 3, Format: 2, Chunks: 3, Hash: hash([][]byte{{3, 2, 0}, {3, 2, 1}, {3, 2, 2}}), + Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{3, 2, 0}, {3, 2, 1}, {3, 2, 2}})}, + }, + {Height: 2, Format: 2, Chunks: 3, Hash: hash([][]byte{{2, 2, 0}, {2, 2, 1}, {2, 2, 2}}), + Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{2, 2, 0}, {2, 2, 1}, {2, 2, 2}})}, + }, + {Height: 2, Format: 1, Chunks: 2, Hash: hash([][]byte{{2, 1, 0}, {2, 1, 1}}), + Metadata: types.Metadata{ChunkHashes: checksums([][]byte{{2, 1, 0}, {2, 1, 1}})}, + }, + }, snapshots) + + // Pruning all heights should also be fine + pruned, err = store.Prune(0) + require.NoError(t, err) + assert.EqualValues(t, 3, pruned) + + snapshots, err = store.List() + require.NoError(t, err) + assert.Empty(t, snapshots) +} + +func TestStore_Save(t *testing.T) { + store, teardown := setupStore(t) + defer teardown() + + // Saving a snapshot should work + snapshot, err := store.Save(4, 1, makeChunks([][]byte{{1}, {2}})) + require.NoError(t, err) + assert.Equal(t, &types.Snapshot{ + Height: 4, + Format: 1, + Chunks: 2, + Hash: hash([][]byte{{1}, {2}}), + Metadata: types.Metadata{ + ChunkHashes: [][]byte{ + checksum([]byte{1}), + checksum([]byte{2}), + }, + }, + }, snapshot) + loaded, err := store.Get(snapshot.Height, snapshot.Format) + require.NoError(t, err) + assert.Equal(t, snapshot, loaded) + + // Saving an existing snapshot should error + _, err = store.Save(4, 1, makeChunks([][]byte{{1}, {2}})) + require.Error(t, err) + + // Saving at height 0 should error + _, err = store.Save(0, 1, makeChunks([][]byte{{1}, {2}})) + require.Error(t, err) + + // Saving at format 0 should be fine + _, err = store.Save(1, 0, makeChunks([][]byte{{1}, {2}})) + require.NoError(t, err) + + // Saving a snapshot with no chunks should be fine, as should loading it + _, err = store.Save(5, 1, makeChunks([][]byte{})) + require.NoError(t, err) + snapshot, chunks, err := store.Load(5, 1) + require.NoError(t, err) + assert.Equal(t, &types.Snapshot{Height: 5, Format: 1, Hash: hash([][]byte{}), Metadata: types.Metadata{ChunkHashes: [][]byte{}}}, snapshot) + assert.Empty(t, chunks) + + // Saving a snapshot should error if a chunk reader returns an error, and it should empty out + // the channel + someErr := errors.New("boom") + pr, pw := io.Pipe() + err = pw.CloseWithError(someErr) + require.NoError(t, err) + + ch := make(chan io.ReadCloser, 2) + ch <- pr + ch <- ioutil.NopCloser(bytes.NewBuffer([]byte{0xff})) + close(ch) + + _, err = store.Save(6, 1, ch) + require.Error(t, err) + require.True(t, errors.Is(err, someErr)) + assert.Empty(t, ch) + + // Saving a snapshot should error if a snapshot is already in progress for the same height, + // regardless of format. However, a different height should succeed. + ch = make(chan io.ReadCloser) + go store.Save(7, 1, ch) + time.Sleep(10 * time.Millisecond) + _, err = store.Save(7, 2, makeChunks(nil)) + require.Error(t, err) + _, err = store.Save(8, 1, makeChunks(nil)) + require.NoError(t, err) + close(ch) +} diff --git a/abci/example/iavlstore/snapshots/types/convert.go b/abci/example/iavlstore/snapshots/types/convert.go new file mode 100644 index 000000000..33e315ee0 --- /dev/null +++ b/abci/example/iavlstore/snapshots/types/convert.go @@ -0,0 +1,39 @@ +package types + +import ( + fmt "fmt" + + proto "github.com/gogo/protobuf/proto" + abci "github.com/tendermint/tendermint/abci/types" +) + +// Converts an ABCI snapshot to a snapshot. Mainly to decode the SDK metadata. +func SnapshotFromABCI(in *abci.Snapshot) (Snapshot, error) { + snapshot := Snapshot{ + Height: in.Height, + Format: in.Format, + Chunks: in.Chunks, + Hash: in.Hash, + } + err := proto.Unmarshal(in.Metadata, &snapshot.Metadata) + if err != nil { + return Snapshot{}, fmt.Errorf("failed to unmarshal snapshot metadata: %w", err) + } + return snapshot, nil +} + +// Converts a Snapshot to its ABCI representation. Mainly to encode the SDK metadata. +func (s Snapshot) ToABCI() (abci.Snapshot, error) { + out := abci.Snapshot{ + Height: s.Height, + Format: s.Format, + Chunks: s.Chunks, + Hash: s.Hash, + } + var err error + out.Metadata, err = proto.Marshal(&s.Metadata) + if err != nil { + return abci.Snapshot{}, fmt.Errorf("failed to marshal snapshot metadata: %w", err) + } + return out, nil +} diff --git a/abci/example/iavlstore/snapshots/types/errors.go b/abci/example/iavlstore/snapshots/types/errors.go new file mode 100644 index 000000000..c98dd055e --- /dev/null +++ b/abci/example/iavlstore/snapshots/types/errors.go @@ -0,0 +1,16 @@ +package types + +import ( + "errors" +) + +var ( + // ErrUnknownFormat is returned when an unknown format is used. + ErrUnknownFormat = errors.New("unknown snapshot format") + + // ErrChunkHashMismatch is returned when chunk hash verification failed. + ErrChunkHashMismatch = errors.New("chunk hash verification failed") + + // ErrInvalidMetadata is returned when the snapshot metadata is invalid. + ErrInvalidMetadata = errors.New("invalid snapshot metadata") +) diff --git a/abci/example/iavlstore/snapshots/types/format.go b/abci/example/iavlstore/snapshots/types/format.go new file mode 100644 index 000000000..edfdb36d7 --- /dev/null +++ b/abci/example/iavlstore/snapshots/types/format.go @@ -0,0 +1,6 @@ +package types + +// CurrentFormat is the currently used format for snapshots. Snapshots using the same format +// must be identical across all nodes for a given height, so this must be bumped when the binary +// snapshot output changes. +const CurrentFormat uint32 = 1 diff --git a/abci/example/iavlstore/snapshots/types/snapshot.pb.go b/abci/example/iavlstore/snapshots/types/snapshot.pb.go new file mode 100644 index 000000000..92fcb8151 --- /dev/null +++ b/abci/example/iavlstore/snapshots/types/snapshot.pb.go @@ -0,0 +1,667 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: cosmos/base/snapshots/v1beta1/snapshot.proto + +package types + +import ( + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// Snapshot contains Tendermint state sync snapshot info. +type Snapshot struct { + Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` + Format uint32 `protobuf:"varint,2,opt,name=format,proto3" json:"format,omitempty"` + Chunks uint32 `protobuf:"varint,3,opt,name=chunks,proto3" json:"chunks,omitempty"` + Hash []byte `protobuf:"bytes,4,opt,name=hash,proto3" json:"hash,omitempty"` + Metadata Metadata `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata"` +} + +func (m *Snapshot) Reset() { *m = Snapshot{} } +func (m *Snapshot) String() string { return proto.CompactTextString(m) } +func (*Snapshot) ProtoMessage() {} +func (*Snapshot) Descriptor() ([]byte, []int) { + return fileDescriptor_dd7a3c9b0a19e1ee, []int{0} +} +func (m *Snapshot) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Snapshot) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Snapshot.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Snapshot) XXX_Merge(src proto.Message) { + xxx_messageInfo_Snapshot.Merge(m, src) +} +func (m *Snapshot) XXX_Size() int { + return m.Size() +} +func (m *Snapshot) XXX_DiscardUnknown() { + xxx_messageInfo_Snapshot.DiscardUnknown(m) +} + +var xxx_messageInfo_Snapshot proto.InternalMessageInfo + +func (m *Snapshot) GetHeight() uint64 { + if m != nil { + return m.Height + } + return 0 +} + +func (m *Snapshot) GetFormat() uint32 { + if m != nil { + return m.Format + } + return 0 +} + +func (m *Snapshot) GetChunks() uint32 { + if m != nil { + return m.Chunks + } + return 0 +} + +func (m *Snapshot) GetHash() []byte { + if m != nil { + return m.Hash + } + return nil +} + +func (m *Snapshot) GetMetadata() Metadata { + if m != nil { + return m.Metadata + } + return Metadata{} +} + +// Metadata contains SDK-specific snapshot metadata. +type Metadata struct { + ChunkHashes [][]byte `protobuf:"bytes,1,rep,name=chunk_hashes,json=chunkHashes,proto3" json:"chunk_hashes,omitempty"` +} + +func (m *Metadata) Reset() { *m = Metadata{} } +func (m *Metadata) String() string { return proto.CompactTextString(m) } +func (*Metadata) ProtoMessage() {} +func (*Metadata) Descriptor() ([]byte, []int) { + return fileDescriptor_dd7a3c9b0a19e1ee, []int{1} +} +func (m *Metadata) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Metadata.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Metadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_Metadata.Merge(m, src) +} +func (m *Metadata) XXX_Size() int { + return m.Size() +} +func (m *Metadata) XXX_DiscardUnknown() { + xxx_messageInfo_Metadata.DiscardUnknown(m) +} + +var xxx_messageInfo_Metadata proto.InternalMessageInfo + +func (m *Metadata) GetChunkHashes() [][]byte { + if m != nil { + return m.ChunkHashes + } + return nil +} + +func init() { + proto.RegisterType((*Snapshot)(nil), "cosmos.base.snapshots.v1beta1.Snapshot") + proto.RegisterType((*Metadata)(nil), "cosmos.base.snapshots.v1beta1.Metadata") +} + +func init() { + proto.RegisterFile("cosmos/base/snapshots/v1beta1/snapshot.proto", fileDescriptor_dd7a3c9b0a19e1ee) +} + +var fileDescriptor_dd7a3c9b0a19e1ee = []byte{ + // 288 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x31, 0x4e, 0xc3, 0x30, + 0x14, 0x86, 0x63, 0x1a, 0xaa, 0xc8, 0x0d, 0x8b, 0x85, 0x90, 0x85, 0x84, 0x09, 0x5d, 0xc8, 0xd0, + 0x3a, 0x2a, 0xdc, 0xa0, 0x03, 0x82, 0x81, 0x25, 0x6c, 0x2c, 0xc8, 0x49, 0x4d, 0x5c, 0x55, 0xa9, + 0xa3, 0x3e, 0x17, 0x89, 0x5b, 0x70, 0x15, 0x6e, 0xd1, 0xb1, 0x23, 0x13, 0x42, 0xc9, 0x45, 0x50, + 0x1c, 0x13, 0x31, 0x75, 0xca, 0xfb, 0xbf, 0x7c, 0x4f, 0xcf, 0xfa, 0xf1, 0x24, 0xd7, 0x50, 0x6a, + 0x48, 0x32, 0x01, 0x32, 0x81, 0xb5, 0xa8, 0x40, 0x69, 0x03, 0xc9, 0xdb, 0x2c, 0x93, 0x46, 0xcc, + 0x7a, 0xc2, 0xab, 0x8d, 0x36, 0x9a, 0x5c, 0x74, 0x36, 0x6f, 0x6d, 0xde, 0xdb, 0xdc, 0xd9, 0xe7, + 0xa7, 0x85, 0x2e, 0xb4, 0x35, 0x93, 0x76, 0xea, 0x96, 0xc6, 0x9f, 0x08, 0x07, 0x4f, 0xce, 0x25, + 0x67, 0x78, 0xa8, 0xe4, 0xb2, 0x50, 0x86, 0xa2, 0x08, 0xc5, 0x7e, 0xea, 0x52, 0xcb, 0x5f, 0xf5, + 0xa6, 0x14, 0x86, 0x1e, 0x45, 0x28, 0x3e, 0x49, 0x5d, 0x6a, 0x79, 0xae, 0xb6, 0xeb, 0x15, 0xd0, + 0x41, 0xc7, 0xbb, 0x44, 0x08, 0xf6, 0x95, 0x00, 0x45, 0xfd, 0x08, 0xc5, 0x61, 0x6a, 0x67, 0xf2, + 0x80, 0x83, 0x52, 0x1a, 0xb1, 0x10, 0x46, 0xd0, 0xe3, 0x08, 0xc5, 0xa3, 0x9b, 0x6b, 0x7e, 0xf0, + 0xc1, 0xfc, 0xd1, 0xe9, 0x73, 0x7f, 0xf7, 0x7d, 0xe9, 0xa5, 0xfd, 0xfa, 0x78, 0x8a, 0x83, 0xbf, + 0x7f, 0xe4, 0x0a, 0x87, 0xf6, 0xe8, 0x4b, 0x7b, 0x44, 0x02, 0x45, 0xd1, 0x20, 0x0e, 0xd3, 0x91, + 0x65, 0xf7, 0x16, 0xcd, 0xef, 0x76, 0x35, 0x43, 0xfb, 0x9a, 0xa1, 0x9f, 0x9a, 0xa1, 0x8f, 0x86, + 0x79, 0xfb, 0x86, 0x79, 0x5f, 0x0d, 0xf3, 0x9e, 0x27, 0xc5, 0xd2, 0xa8, 0x6d, 0xc6, 0x73, 0x5d, + 0x26, 0xae, 0xea, 0xee, 0x33, 0x85, 0xc5, 0xea, 0x5f, 0xe1, 0xe6, 0xbd, 0x92, 0x90, 0x0d, 0x6d, + 0x63, 0xb7, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x89, 0x2e, 0x8f, 0x96, 0x01, 0x00, 0x00, +} + +func (m *Snapshot) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Snapshot) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Snapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + { + size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSnapshot(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + if len(m.Hash) > 0 { + i -= len(m.Hash) + copy(dAtA[i:], m.Hash) + i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Hash))) + i-- + dAtA[i] = 0x22 + } + if m.Chunks != 0 { + i = encodeVarintSnapshot(dAtA, i, uint64(m.Chunks)) + i-- + dAtA[i] = 0x18 + } + if m.Format != 0 { + i = encodeVarintSnapshot(dAtA, i, uint64(m.Format)) + i-- + dAtA[i] = 0x10 + } + if m.Height != 0 { + i = encodeVarintSnapshot(dAtA, i, uint64(m.Height)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Metadata) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Metadata) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Metadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ChunkHashes) > 0 { + for iNdEx := len(m.ChunkHashes) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.ChunkHashes[iNdEx]) + copy(dAtA[i:], m.ChunkHashes[iNdEx]) + i = encodeVarintSnapshot(dAtA, i, uint64(len(m.ChunkHashes[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func encodeVarintSnapshot(dAtA []byte, offset int, v uint64) int { + offset -= sovSnapshot(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *Snapshot) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Height != 0 { + n += 1 + sovSnapshot(uint64(m.Height)) + } + if m.Format != 0 { + n += 1 + sovSnapshot(uint64(m.Format)) + } + if m.Chunks != 0 { + n += 1 + sovSnapshot(uint64(m.Chunks)) + } + l = len(m.Hash) + if l > 0 { + n += 1 + l + sovSnapshot(uint64(l)) + } + l = m.Metadata.Size() + n += 1 + l + sovSnapshot(uint64(l)) + return n +} + +func (m *Metadata) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.ChunkHashes) > 0 { + for _, b := range m.ChunkHashes { + l = len(b) + n += 1 + l + sovSnapshot(uint64(l)) + } + } + return n +} + +func sovSnapshot(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozSnapshot(x uint64) (n int) { + return sovSnapshot(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Snapshot) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Snapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Snapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType) + } + m.Height = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Height |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Format", wireType) + } + m.Format = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Format |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + m.Chunks = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Chunks |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthSnapshot + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthSnapshot + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Hash = append(m.Hash[:0], dAtA[iNdEx:postIndex]...) + if m.Hash == nil { + m.Hash = []byte{} + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSnapshot + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSnapshot + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Metadata.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSnapshot(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Metadata) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Metadata: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Metadata: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ChunkHashes", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthSnapshot + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthSnapshot + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ChunkHashes = append(m.ChunkHashes, make([]byte, postIndex-iNdEx)) + copy(m.ChunkHashes[len(m.ChunkHashes)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSnapshot(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipSnapshot(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSnapshot + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSnapshot + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSnapshot + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthSnapshot + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupSnapshot + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthSnapshot + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthSnapshot = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowSnapshot = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupSnapshot = fmt.Errorf("proto: unexpected end of group") +) diff --git a/abci/example/iavlstore/snapshots/types/snapshotter.go b/abci/example/iavlstore/snapshots/types/snapshotter.go new file mode 100644 index 000000000..1ebd763b5 --- /dev/null +++ b/abci/example/iavlstore/snapshots/types/snapshotter.go @@ -0,0 +1,16 @@ +package types + +import "io" + +// Snapshotter is something that can create and restore snapshots, consisting of streamed binary +// chunks - all of which must be read from the channel and closed. If an unsupported format is +// given, it must return ErrUnknownFormat (possibly wrapped with fmt.Errorf). +type Snapshotter interface { + // Snapshot creates a state snapshot, returning a channel of snapshot chunk readers. + Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) + + // Restore restores a state snapshot, taking snapshot chunk readers as input. + // If the ready channel is non-nil, it returns a ready signal (by being closed) once the + // restorer is ready to accept chunks. + Restore(height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{}) error +} diff --git a/abci/example/iavlstore/snapshots/types/store.pb.go b/abci/example/iavlstore/snapshots/types/store.pb.go new file mode 100644 index 000000000..fc1ad4ba8 --- /dev/null +++ b/abci/example/iavlstore/snapshots/types/store.pb.go @@ -0,0 +1,851 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: cosmos/base/store/v1beta1/snapshot.proto + +package types + +import ( + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// SnapshotItem is an item contained in a rootmulti.Store snapshot. +type SnapshotItem struct { + // item is the specific type of snapshot item. + // + // Types that are valid to be assigned to Item: + // *SnapshotItem_Store + // *SnapshotItem_IAVL + Item isSnapshotItem_Item `protobuf_oneof:"item"` +} + +func (m *SnapshotItem) Reset() { *m = SnapshotItem{} } +func (m *SnapshotItem) String() string { return proto.CompactTextString(m) } +func (*SnapshotItem) ProtoMessage() {} +func (*SnapshotItem) Descriptor() ([]byte, []int) { + return fileDescriptor_9c55879db4cc4502, []int{0} +} +func (m *SnapshotItem) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SnapshotItem) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SnapshotItem.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SnapshotItem) XXX_Merge(src proto.Message) { + xxx_messageInfo_SnapshotItem.Merge(m, src) +} +func (m *SnapshotItem) XXX_Size() int { + return m.Size() +} +func (m *SnapshotItem) XXX_DiscardUnknown() { + xxx_messageInfo_SnapshotItem.DiscardUnknown(m) +} + +var xxx_messageInfo_SnapshotItem proto.InternalMessageInfo + +type isSnapshotItem_Item interface { + isSnapshotItem_Item() + MarshalTo([]byte) (int, error) + Size() int +} + +type SnapshotItem_Store struct { + Store *SnapshotStoreItem `protobuf:"bytes,1,opt,name=store,proto3,oneof" json:"store,omitempty"` +} +type SnapshotItem_IAVL struct { + IAVL *SnapshotIAVLItem `protobuf:"bytes,2,opt,name=iavl,proto3,oneof" json:"iavl,omitempty"` +} + +func (*SnapshotItem_Store) isSnapshotItem_Item() {} +func (*SnapshotItem_IAVL) isSnapshotItem_Item() {} + +func (m *SnapshotItem) GetItem() isSnapshotItem_Item { + if m != nil { + return m.Item + } + return nil +} + +func (m *SnapshotItem) GetStore() *SnapshotStoreItem { + if x, ok := m.GetItem().(*SnapshotItem_Store); ok { + return x.Store + } + return nil +} + +func (m *SnapshotItem) GetIAVL() *SnapshotIAVLItem { + if x, ok := m.GetItem().(*SnapshotItem_IAVL); ok { + return x.IAVL + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*SnapshotItem) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*SnapshotItem_Store)(nil), + (*SnapshotItem_IAVL)(nil), + } +} + +// SnapshotStoreItem contains metadata about a snapshotted store. +type SnapshotStoreItem struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` +} + +func (m *SnapshotStoreItem) Reset() { *m = SnapshotStoreItem{} } +func (m *SnapshotStoreItem) String() string { return proto.CompactTextString(m) } +func (*SnapshotStoreItem) ProtoMessage() {} +func (*SnapshotStoreItem) Descriptor() ([]byte, []int) { + return fileDescriptor_9c55879db4cc4502, []int{1} +} +func (m *SnapshotStoreItem) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SnapshotStoreItem) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SnapshotStoreItem.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SnapshotStoreItem) XXX_Merge(src proto.Message) { + xxx_messageInfo_SnapshotStoreItem.Merge(m, src) +} +func (m *SnapshotStoreItem) XXX_Size() int { + return m.Size() +} +func (m *SnapshotStoreItem) XXX_DiscardUnknown() { + xxx_messageInfo_SnapshotStoreItem.DiscardUnknown(m) +} + +var xxx_messageInfo_SnapshotStoreItem proto.InternalMessageInfo + +func (m *SnapshotStoreItem) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +// SnapshotIAVLItem is an exported IAVL node. +type SnapshotIAVLItem struct { + Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Version int64 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` + Height int32 `protobuf:"varint,4,opt,name=height,proto3" json:"height,omitempty"` +} + +func (m *SnapshotIAVLItem) Reset() { *m = SnapshotIAVLItem{} } +func (m *SnapshotIAVLItem) String() string { return proto.CompactTextString(m) } +func (*SnapshotIAVLItem) ProtoMessage() {} +func (*SnapshotIAVLItem) Descriptor() ([]byte, []int) { + return fileDescriptor_9c55879db4cc4502, []int{2} +} +func (m *SnapshotIAVLItem) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SnapshotIAVLItem) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SnapshotIAVLItem.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SnapshotIAVLItem) XXX_Merge(src proto.Message) { + xxx_messageInfo_SnapshotIAVLItem.Merge(m, src) +} +func (m *SnapshotIAVLItem) XXX_Size() int { + return m.Size() +} +func (m *SnapshotIAVLItem) XXX_DiscardUnknown() { + xxx_messageInfo_SnapshotIAVLItem.DiscardUnknown(m) +} + +var xxx_messageInfo_SnapshotIAVLItem proto.InternalMessageInfo + +func (m *SnapshotIAVLItem) GetKey() []byte { + if m != nil { + return m.Key + } + return nil +} + +func (m *SnapshotIAVLItem) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *SnapshotIAVLItem) GetVersion() int64 { + if m != nil { + return m.Version + } + return 0 +} + +func (m *SnapshotIAVLItem) GetHeight() int32 { + if m != nil { + return m.Height + } + return 0 +} + +func init() { + proto.RegisterType((*SnapshotItem)(nil), "cosmos.base.store.v1beta1.SnapshotItem") + proto.RegisterType((*SnapshotStoreItem)(nil), "cosmos.base.store.v1beta1.SnapshotStoreItem") + proto.RegisterType((*SnapshotIAVLItem)(nil), "cosmos.base.store.v1beta1.SnapshotIAVLItem") +} + +func init() { + proto.RegisterFile("cosmos/base/store/v1beta1/snapshot.proto", fileDescriptor_9c55879db4cc4502) +} + +var fileDescriptor_9c55879db4cc4502 = []byte{ + // 324 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xc1, 0x4a, 0xc3, 0x30, + 0x18, 0xc7, 0x1b, 0xd7, 0x4d, 0xfd, 0xdc, 0x61, 0x86, 0x21, 0xd5, 0x43, 0x1d, 0xbb, 0x58, 0x50, + 0x13, 0xa6, 0x4f, 0x60, 0xf1, 0xb0, 0xa1, 0xa7, 0x0c, 0x3c, 0x78, 0x4b, 0x67, 0x68, 0xcb, 0xd6, + 0x65, 0x2c, 0x59, 0x61, 0x6f, 0xe1, 0x6b, 0xf8, 0x26, 0x1e, 0x77, 0xf4, 0x24, 0xd2, 0xbd, 0x88, + 0x24, 0xe9, 0x2e, 0x8a, 0xe0, 0xa9, 0xdf, 0xbf, 0xfc, 0xfe, 0xbf, 0x7c, 0xf0, 0x41, 0x34, 0x91, + 0xaa, 0x90, 0x8a, 0x26, 0x5c, 0x09, 0xaa, 0xb4, 0x5c, 0x0a, 0x5a, 0x0e, 0x12, 0xa1, 0xf9, 0x80, + 0xaa, 0x39, 0x5f, 0xa8, 0x4c, 0x6a, 0xb2, 0x58, 0x4a, 0x2d, 0xf1, 0xa9, 0x23, 0x89, 0x21, 0x89, + 0x25, 0x49, 0x4d, 0x9e, 0x75, 0x53, 0x99, 0x4a, 0x4b, 0x51, 0x33, 0xb9, 0x42, 0xff, 0x0d, 0x41, + 0x7b, 0x5c, 0x3b, 0x46, 0x5a, 0x14, 0xf8, 0x1e, 0x9a, 0xb6, 0x17, 0xa0, 0x1e, 0x8a, 0x8e, 0x6e, + 0xae, 0xc8, 0x9f, 0x46, 0xb2, 0xeb, 0x8d, 0xcd, 0x5f, 0x53, 0x1e, 0x7a, 0xcc, 0x95, 0xf1, 0x03, + 0xf8, 0x39, 0x2f, 0x67, 0xc1, 0x9e, 0x95, 0x5c, 0xfe, 0x43, 0x32, 0xba, 0x7b, 0x7a, 0x34, 0x8e, + 0xf8, 0xa0, 0xfa, 0x3c, 0xf7, 0x4d, 0x1a, 0x7a, 0xcc, 0x4a, 0xe2, 0x16, 0xf8, 0xb9, 0x16, 0x45, + 0xff, 0x02, 0x8e, 0x7f, 0x3d, 0x89, 0x31, 0xf8, 0x73, 0x5e, 0xb8, 0x75, 0x0f, 0x99, 0x9d, 0xfb, + 0x33, 0xe8, 0xfc, 0xd4, 0xe2, 0x0e, 0x34, 0xa6, 0x62, 0x6d, 0xb1, 0x36, 0x33, 0x23, 0xee, 0x42, + 0xb3, 0xe4, 0xb3, 0x95, 0xb0, 0x4b, 0xb6, 0x99, 0x0b, 0x38, 0x80, 0xfd, 0x52, 0x2c, 0x55, 0x2e, + 0xe7, 0x41, 0xa3, 0x87, 0xa2, 0x06, 0xdb, 0x45, 0x7c, 0x02, 0xad, 0x4c, 0xe4, 0x69, 0xa6, 0x03, + 0xbf, 0x87, 0xa2, 0x26, 0xab, 0x53, 0x1c, 0xbf, 0x57, 0x21, 0xda, 0x54, 0x21, 0xfa, 0xaa, 0x42, + 0xf4, 0xba, 0x0d, 0xbd, 0xcd, 0x36, 0xf4, 0x3e, 0xb6, 0xa1, 0xf7, 0x1c, 0xa5, 0xb9, 0xce, 0x56, + 0x09, 0x99, 0xc8, 0x82, 0xd6, 0x27, 0x74, 0x9f, 0x6b, 0xf5, 0x32, 0xad, 0x0f, 0xa9, 0xd7, 0x0b, + 0xa1, 0x92, 0x96, 0xbd, 0xc6, 0xed, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x75, 0x87, 0x24, 0x7b, + 0xea, 0x01, 0x00, 0x00, +} + +func (m *SnapshotItem) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SnapshotItem) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SnapshotItem) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Item != nil { + { + size := m.Item.Size() + i -= size + if _, err := m.Item.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } + return len(dAtA) - i, nil +} + +func (m *SnapshotItem_Store) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SnapshotItem_Store) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Store != nil { + { + size, err := m.Store.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSnapshot(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} +func (m *SnapshotItem_IAVL) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SnapshotItem_IAVL) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.IAVL != nil { + { + size, err := m.IAVL.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintSnapshot(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} +func (m *SnapshotStoreItem) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SnapshotStoreItem) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SnapshotStoreItem) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SnapshotIAVLItem) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SnapshotIAVLItem) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SnapshotIAVLItem) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Height != 0 { + i = encodeVarintSnapshot(dAtA, i, uint64(m.Height)) + i-- + dAtA[i] = 0x20 + } + if m.Version != 0 { + i = encodeVarintSnapshot(dAtA, i, uint64(m.Version)) + i-- + dAtA[i] = 0x18 + } + if len(m.Value) > 0 { + i -= len(m.Value) + copy(dAtA[i:], m.Value) + i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Value))) + i-- + dAtA[i] = 0x12 + } + if len(m.Key) > 0 { + i -= len(m.Key) + copy(dAtA[i:], m.Key) + i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Key))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SnapshotItem) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Item != nil { + n += m.Item.Size() + } + return n +} + +func (m *SnapshotItem_Store) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Store != nil { + l = m.Store.Size() + n += 1 + l + sovSnapshot(uint64(l)) + } + return n +} +func (m *SnapshotItem_IAVL) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.IAVL != nil { + l = m.IAVL.Size() + n += 1 + l + sovSnapshot(uint64(l)) + } + return n +} +func (m *SnapshotStoreItem) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovSnapshot(uint64(l)) + } + return n +} + +func (m *SnapshotIAVLItem) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovSnapshot(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sovSnapshot(uint64(l)) + } + if m.Version != 0 { + n += 1 + sovSnapshot(uint64(m.Version)) + } + if m.Height != 0 { + n += 1 + sovSnapshot(uint64(m.Height)) + } + return n +} + +func (m *SnapshotItem) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SnapshotItem: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SnapshotItem: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Store", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSnapshot + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSnapshot + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &SnapshotStoreItem{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Item = &SnapshotItem_Store{v} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field IAVL", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSnapshot + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthSnapshot + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &SnapshotIAVLItem{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Item = &SnapshotItem_IAVL{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSnapshot(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SnapshotStoreItem) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SnapshotStoreItem: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SnapshotStoreItem: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSnapshot + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSnapshot + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSnapshot(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SnapshotIAVLItem) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SnapshotIAVLItem: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SnapshotIAVLItem: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthSnapshot + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthSnapshot + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...) + if m.Key == nil { + m.Key = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthSnapshot + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthSnapshot + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...) + if m.Value == nil { + m.Value = []byte{} + } + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) + } + m.Version = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Version |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType) + } + m.Height = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSnapshot + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Height |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipSnapshot(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthSnapshot + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} diff --git a/abci/example/iavlstore/snapshots/util.go b/abci/example/iavlstore/snapshots/util.go new file mode 100644 index 000000000..19bc87c1a --- /dev/null +++ b/abci/example/iavlstore/snapshots/util.go @@ -0,0 +1,162 @@ +package snapshots + +import ( + "errors" + "io" +) + +// ChunkWriter reads an input stream, splits it into fixed-size chunks, and writes them to a +// sequence of io.ReadClosers via a channel. +type ChunkWriter struct { + ch chan<- io.ReadCloser + pipe *io.PipeWriter + chunkSize uint64 + written uint64 + closed bool +} + +// NewChunkWriter creates a new ChunkWriter. If chunkSize is 0, no chunking will be done. +func NewChunkWriter(ch chan<- io.ReadCloser, chunkSize uint64) *ChunkWriter { + return &ChunkWriter{ + ch: ch, + chunkSize: chunkSize, + } +} + +// chunk creates a new chunk. +func (w *ChunkWriter) chunk() error { + if w.pipe != nil { + err := w.pipe.Close() + if err != nil { + return err + } + } + pr, pw := io.Pipe() + w.ch <- pr + w.pipe = pw + w.written = 0 + return nil +} + +// Close implements io.Closer. +func (w *ChunkWriter) Close() error { + if !w.closed { + w.closed = true + close(w.ch) + var err error + if w.pipe != nil { + err = w.pipe.Close() + } + return err + } + return nil +} + +// CloseWithError closes the writer and sends an error to the reader. +func (w *ChunkWriter) CloseWithError(err error) { + if !w.closed { + w.closed = true + close(w.ch) + if w.pipe != nil { + w.pipe.CloseWithError(err) + } + } +} + +// Write implements io.Writer. +func (w *ChunkWriter) Write(data []byte) (int, error) { + if w.closed { + return 0, errors.New("cannot write to closed ChunkWriter") + } + nTotal := 0 + for len(data) > 0 { + if w.pipe == nil || (w.written >= w.chunkSize && w.chunkSize > 0) { + err := w.chunk() + if err != nil { + return nTotal, err + } + } + + var writeSize uint64 + if w.chunkSize == 0 { + writeSize = uint64(len(data)) + } else { + writeSize = w.chunkSize - w.written + } + if writeSize > uint64(len(data)) { + writeSize = uint64(len(data)) + } + + n, err := w.pipe.Write(data[:writeSize]) + w.written += uint64(n) + nTotal += n + if err != nil { + return nTotal, err + } + data = data[writeSize:] + } + return nTotal, nil +} + +// ChunkReader reads chunks from a channel of io.ReadClosers and outputs them as an io.Reader +type ChunkReader struct { + ch <-chan io.ReadCloser + reader io.ReadCloser +} + +// NewChunkReader creates a new ChunkReader. +func NewChunkReader(ch <-chan io.ReadCloser) *ChunkReader { + return &ChunkReader{ch: ch} +} + +// next fetches the next chunk from the channel, or returns io.EOF if there are no more chunks. +func (r *ChunkReader) next() error { + reader, ok := <-r.ch + if !ok { + return io.EOF + } + r.reader = reader + return nil +} + +// Close implements io.ReadCloser. +func (r *ChunkReader) Close() error { + var err error + if r.reader != nil { + err = r.reader.Close() + r.reader = nil + } + for reader := range r.ch { + if e := reader.Close(); e != nil && err == nil { + err = e + } + } + return err +} + +// Read implements io.Reader. +func (r *ChunkReader) Read(p []byte) (int, error) { + if r.reader == nil { + err := r.next() + if err != nil { + return 0, err + } + } + n, err := r.reader.Read(p) + if err == io.EOF { + err = r.reader.Close() + r.reader = nil + if err != nil { + return 0, err + } + return r.Read(p) + } + return n, err +} + +// DrainChunks drains and closes all remaining chunks from a chunk channel. +func DrainChunks(chunks <-chan io.ReadCloser) { + for chunk := range chunks { + _ = chunk.Close() + } +} diff --git a/abci/example/iavlstore/snapshots/util_test.go b/abci/example/iavlstore/snapshots/util_test.go new file mode 100644 index 000000000..0886d5946 --- /dev/null +++ b/abci/example/iavlstore/snapshots/util_test.go @@ -0,0 +1,166 @@ +package snapshots_test + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/abci/example/iavlstore/snapshots" +) + +func TestChunkWriter(t *testing.T) { + ch := make(chan io.ReadCloser, 100) + go func() { + chunkWriter := snapshots.NewChunkWriter(ch, 2) + + n, err := chunkWriter.Write([]byte{1, 2, 3}) + require.NoError(t, err) + assert.Equal(t, 3, n) + + n, err = chunkWriter.Write([]byte{4, 5, 6}) + require.NoError(t, err) + assert.Equal(t, 3, n) + + n, err = chunkWriter.Write([]byte{7, 8, 9}) + require.NoError(t, err) + assert.Equal(t, 3, n) + + err = chunkWriter.Close() + require.NoError(t, err) + + // closed writer should error + _, err = chunkWriter.Write([]byte{10}) + require.Error(t, err) + + // closing again should be fine + err = chunkWriter.Close() + require.NoError(t, err) + }() + + assert.Equal(t, [][]byte{{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9}}, readChunks(ch)) + + // 0-sized chunks should return the whole body as one chunk + ch = make(chan io.ReadCloser, 100) + go func() { + chunkWriter := snapshots.NewChunkWriter(ch, 0) + _, err := chunkWriter.Write([]byte{1, 2, 3}) + require.NoError(t, err) + _, err = chunkWriter.Write([]byte{4, 5, 6}) + require.NoError(t, err) + err = chunkWriter.Close() + require.NoError(t, err) + }() + assert.Equal(t, [][]byte{{1, 2, 3, 4, 5, 6}}, readChunks(ch)) + + // closing with error should return the error + theErr := errors.New("boom") + ch = make(chan io.ReadCloser, 100) + go func() { + chunkWriter := snapshots.NewChunkWriter(ch, 2) + _, err := chunkWriter.Write([]byte{1, 2, 3}) + require.NoError(t, err) + chunkWriter.CloseWithError(theErr) + }() + chunk, err := ioutil.ReadAll(<-ch) + require.NoError(t, err) + assert.Equal(t, []byte{1, 2}, chunk) + _, err = ioutil.ReadAll(<-ch) + require.Error(t, err) + assert.Equal(t, theErr, err) + assert.Empty(t, ch) + + // closing immediately should return no chunks + ch = make(chan io.ReadCloser, 100) + chunkWriter := snapshots.NewChunkWriter(ch, 2) + err = chunkWriter.Close() + require.NoError(t, err) + assert.Empty(t, ch) +} + +func TestChunkReader(t *testing.T) { + + ch := makeChunks([][]byte{ + {1, 2, 3}, + {4}, + {}, + {5, 6}, + }) + chunkReader := snapshots.NewChunkReader(ch) + + buf := []byte{0, 0, 0, 0} + n, err := chunkReader.Read(buf) + require.NoError(t, err) + assert.Equal(t, 3, n) + assert.Equal(t, []byte{1, 2, 3, 0}, buf) + + buf = []byte{0, 0, 0, 0} + n, err = chunkReader.Read(buf) + require.NoError(t, err) + assert.Equal(t, 1, n) + assert.Equal(t, []byte{4, 0, 0, 0}, buf) + + buf = []byte{0, 0, 0, 0} + n, err = chunkReader.Read(buf) + require.NoError(t, err) + assert.Equal(t, 2, n) + assert.Equal(t, []byte{5, 6, 0, 0}, buf) + + buf = []byte{0, 0, 0, 0} + _, err = chunkReader.Read(buf) + require.Error(t, err) + assert.Equal(t, io.EOF, err) + + err = chunkReader.Close() + require.NoError(t, err) + + err = chunkReader.Close() // closing twice should be fine + require.NoError(t, err) + + // Empty channel should be fine + ch = makeChunks(nil) + chunkReader = snapshots.NewChunkReader(ch) + buf = make([]byte, 4) + _, err = chunkReader.Read(buf) + require.Error(t, err) + assert.Equal(t, io.EOF, err) + + // Using a pipe that closes with an error should return the error + theErr := errors.New("boom") + pr, pw := io.Pipe() + pch := make(chan io.ReadCloser, 1) + pch <- pr + pw.CloseWithError(theErr) + + chunkReader = snapshots.NewChunkReader(pch) + buf = make([]byte, 4) + _, err = chunkReader.Read(buf) + require.Error(t, err) + assert.Equal(t, theErr, err) + + // Closing the reader should close the writer + pr, pw = io.Pipe() + pch = make(chan io.ReadCloser, 2) + pch <- ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) + pch <- pr + close(pch) + + go func() { + chunkReader = snapshots.NewChunkReader(pch) + buf = []byte{0, 0, 0, 0} + _, err = chunkReader.Read(buf) + require.NoError(t, err) + assert.Equal(t, []byte{1, 2, 3, 0}, buf) + + err = chunkReader.Close() + require.NoError(t, err) + }() + + _, err = pw.Write([]byte{9, 9, 9}) + require.Error(t, err) + assert.Equal(t, err, io.ErrClosedPipe) +} diff --git a/go.mod b/go.mod index 0dd5157c4..a8c67e8a0 100644 --- a/go.mod +++ b/go.mod @@ -15,14 +15,19 @@ require ( github.com/gtank/merlin v0.1.1 github.com/libp2p/go-buffer-pool v0.0.2 github.com/minio/highwayhash v1.0.0 + github.com/pelletier/go-toml v1.6.0 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.7.1 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 github.com/rs/cors v1.7.0 github.com/sasha-s/go-deadlock v0.2.0 github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa + github.com/spf13/afero v1.2.1 // indirect github.com/spf13/cobra v1.0.0 + github.com/spf13/jwalterweatherman v1.1.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.7.1 + github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/testify v1.6.1 github.com/tendermint/tm-db v0.6.2 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a diff --git a/go.sum b/go.sum index 6121fa2c1..1cabd4b45 100644 --- a/go.sum +++ b/go.sum @@ -362,6 +362,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pelletier/go-toml v1.6.0 h1:aetoXYr0Tv7xRU/V4B4IZJ2QcbtMUFoNb3ORp7TzIK4= +github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= @@ -449,6 +451,8 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.2.1 h1:qgMbHoJbPbw579P+1zVY+6n4nIFuIchaIjzZ/I/Yq8M= +github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= @@ -458,9 +462,13 @@ github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8= github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= +github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= @@ -471,6 +479,8 @@ github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5J github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=