From 9e643f3628446617fe00eeb408f4173449f6ebb2 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 29 Mar 2022 12:31:37 -0400 Subject: [PATCH] migration: remove stale seen commits (#8205) --- cmd/tendermint/commands/key_migrate.go | 8 ++ go.mod | 4 +- scripts/scmigrate/migrate.go | 162 +++++++++++++++++++++++ scripts/scmigrate/migrate_test.go | 176 +++++++++++++++++++++++++ 4 files changed, 348 insertions(+), 2 deletions(-) create mode 100644 scripts/scmigrate/migrate.go create mode 100644 scripts/scmigrate/migrate_test.go diff --git a/cmd/tendermint/commands/key_migrate.go b/cmd/tendermint/commands/key_migrate.go index 928821586..5866be341 100644 --- a/cmd/tendermint/commands/key_migrate.go +++ b/cmd/tendermint/commands/key_migrate.go @@ -9,6 +9,7 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/scripts/keymigrate" + "github.com/tendermint/tendermint/scripts/scmigrate" ) func MakeKeyMigrateCommand(conf *cfg.Config, logger log.Logger) *cobra.Command { @@ -51,6 +52,13 @@ func MakeKeyMigrateCommand(conf *cfg.Config, logger log.Logger) *cobra.Command { return fmt.Errorf("running migration for context %q: %w", dbctx, err) } + + if dbctx == "blockstore" { + if err := scmigrate.Migrate(ctx, db); err != nil { + return fmt.Errorf("running seen commit migration: %w", err) + + } + } } logger.Info("completed database migration successfully") diff --git a/go.mod b/go.mod index c4c4529d4..ed88618e0 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/go-kit/kit v0.12.0 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 - github.com/golangci/golangci-lint v1.45.2 github.com/google/orderedcode v0.0.1 github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.5.0 @@ -30,7 +29,6 @@ require ( github.com/spf13/viper v1.10.1 github.com/stretchr/testify v1.7.1 github.com/tendermint/tm-db v0.6.6 - github.com/vektra/mockery/v2 v2.10.0 golang.org/x/crypto v0.0.0-20220214200702-86341886e292 golang.org/x/net v0.0.0-20211208012354-db4efeb81f4b golang.org/x/sync v0.0.0-20210220032951-036812b2e83c @@ -41,7 +39,9 @@ require ( require ( github.com/creachadair/atomicfile v0.2.4 + github.com/golangci/golangci-lint v1.45.2 github.com/google/go-cmp v0.5.7 + github.com/vektra/mockery/v2 v2.10.0 gotest.tools v2.2.0+incompatible ) diff --git a/scripts/scmigrate/migrate.go b/scripts/scmigrate/migrate.go new file mode 100644 index 000000000..e6eee3d95 --- /dev/null +++ b/scripts/scmigrate/migrate.go @@ -0,0 +1,162 @@ +// Package scmigrate implements a migration for SeenCommit data +// between 0.34 and 0.35 +// +// The Migrate implementation is idempotent and finds all seen commit +// records and deletes all *except* the record corresponding to the +// highest height. +package scmigrate + +import ( + "bytes" + "context" + "errors" + "fmt" + "sort" + + "github.com/gogo/protobuf/proto" + "github.com/google/orderedcode" + dbm "github.com/tendermint/tm-db" + + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + "github.com/tendermint/tendermint/types" +) + +type toMigrate struct { + key []byte + commit *types.Commit +} + +const prefixSeenCommit = int64(3) + +func makeKeyFromPrefix(ids ...int64) []byte { + vals := make([]interface{}, len(ids)) + for idx := range ids { + vals[idx] = ids[idx] + } + + key, err := orderedcode.Append(nil, vals...) + if err != nil { + panic(err) + } + return key +} + +func makeToMigrate(val []byte) (*types.Commit, error) { + if len(val) == 0 { + return nil, errors.New("empty value") + } + + var pbc = new(tmproto.Commit) + + if err := proto.Unmarshal(val, pbc); err != nil { + return nil, fmt.Errorf("error reading block seen commit: %w", err) + } + + commit, err := types.CommitFromProto(pbc) + if commit == nil { + // theoretically we should error for all errors, but + // there's no reason to keep junk data in the + // database, and it makes testing easier. + if err != nil { + return nil, fmt.Errorf("error from proto commit: %w", err) + } + return nil, fmt.Errorf("missing commit") + } + + return commit, nil +} + +func sortMigrations(scData []toMigrate) { + // put this in it's own function just to make it testable + sort.SliceStable(scData, func(i, j int) bool { + return scData[i].commit.Height > scData[j].commit.Height + }) +} + +func getMigrationsToDelete(in []toMigrate) []toMigrate { return in[1:] } + +func getAllSeenCommits(ctx context.Context, db dbm.DB) ([]toMigrate, error) { + scKeyPrefix := makeKeyFromPrefix(prefixSeenCommit) + iter, err := db.Iterator( + scKeyPrefix, + makeKeyFromPrefix(prefixSeenCommit+1), + ) + if err != nil { + return nil, err + } + + scData := []toMigrate{} + for ; iter.Valid(); iter.Next() { + if err := ctx.Err(); err != nil { + return nil, err + } + + k := iter.Key() + nk := make([]byte, len(k)) + copy(nk, k) + + if !bytes.HasPrefix(nk, scKeyPrefix) { + break + } + commit, err := makeToMigrate(iter.Value()) + if err != nil { + return nil, err + } + + scData = append(scData, toMigrate{ + key: nk, + commit: commit, + }) + } + if err := iter.Error(); err != nil { + return nil, err + } + if err := iter.Close(); err != nil { + return nil, err + } + return scData, nil +} + +func deleteRecords(ctx context.Context, db dbm.DB, scData []toMigrate) error { + // delete all the remaining stale values in a single batch + batch := db.NewBatch() + + for _, mg := range scData { + if err := batch.Delete(mg.key); err != nil { + return err + } + } + + if err := batch.WriteSync(); err != nil { + return err + } + + if err := batch.Close(); err != nil { + return err + } + return nil +} + +func Migrate(ctx context.Context, db dbm.DB) error { + scData, err := getAllSeenCommits(ctx, db) + if err != nil { + return fmt.Errorf("sourcing tasks to migrate: %w", err) + } + + // sort earliest->latest commits. + sortMigrations(scData) + + // trim the one we want to save: + scData = getMigrationsToDelete(scData) + + if len(scData) <= 1 { + return nil + } + + // write the migration (remove ) + if err := deleteRecords(ctx, db, scData); err != nil { + return fmt.Errorf("writing data: %w", err) + } + + return nil +} diff --git a/scripts/scmigrate/migrate_test.go b/scripts/scmigrate/migrate_test.go new file mode 100644 index 000000000..abe12584d --- /dev/null +++ b/scripts/scmigrate/migrate_test.go @@ -0,0 +1,176 @@ +package scmigrate + +import ( + "context" + "math/rand" + "testing" + + "github.com/gogo/protobuf/proto" + dbm "github.com/tendermint/tm-db" + + "github.com/tendermint/tendermint/types" +) + +func appendRandomMigrations(in []toMigrate, num int) []toMigrate { + if in == nil { + in = []toMigrate{} + } + + for i := 0; i < num; i++ { + height := rand.Int63() + if height <= 0 { + continue + } + in = append(in, toMigrate{commit: &types.Commit{Height: height}}) + } + return in +} + +func assertWellOrderedMigrations(t *testing.T, testData []toMigrate) { + t.Run("ValuesDescend", func(t *testing.T) { + for idx := range testData { + height := testData[idx].commit.Height + if idx == 0 { + continue + } + prev := testData[idx-1].commit.Height + if prev < height { + t.Fatal("height decreased in sort order") + } + } + }) + t.Run("EarliestIsZero", func(t *testing.T) { + earliestHeight := testData[len(testData)-1].commit.Height + if earliestHeight != 0 { + t.Fatalf("the earliest height is not 0: %d", earliestHeight) + } + }) +} + +func getLatestHeight(data []toMigrate) int64 { + var out int64 + + for _, d := range data { + if d.commit.Height >= out { + out = d.commit.Height + } + } + + return out +} + +func insertTestData(t *testing.T, db dbm.DB, data []toMigrate) { + t.Helper() + + batch := db.NewBatch() + + for idx, val := range data { + payload, err := proto.Marshal(val.commit.ToProto()) + if err != nil { + t.Fatal(err) + } + + if err := batch.Set(makeKeyFromPrefix(prefixSeenCommit, int64(idx)), payload); err != nil { + t.Fatal(err) + } + } + if err := batch.WriteSync(); err != nil { + t.Fatal(err) + } + if err := batch.Close(); err != nil { + t.Fatal(err) + } +} + +func TestMigrations(t *testing.T) { + t.Run("Sort", func(t *testing.T) { + t.Run("HandCraftedData", func(t *testing.T) { + testData := []toMigrate{ + {commit: &types.Commit{Height: 100}}, + {commit: &types.Commit{Height: 0}}, + {commit: &types.Commit{Height: 8}}, + {commit: &types.Commit{Height: 1}}, + } + + sortMigrations(testData) + assertWellOrderedMigrations(t, testData) + }) + t.Run("RandomGeneratedData", func(t *testing.T) { + testData := []toMigrate{{commit: &types.Commit{Height: 0}}} + + testData = appendRandomMigrations(testData, 10000) + + sortMigrations(testData) + assertWellOrderedMigrations(t, testData) + }) + }) + t.Run("GetMigrationsToDelete", func(t *testing.T) { + for i := 1; i < 100; i++ { + data := appendRandomMigrations([]toMigrate{}, i) + toMigrate := getMigrationsToDelete(data) + if len(data) != len(toMigrate)+1 { + t.Fatalf("migration prep did not save one document [original=%d migrations=%d]", len(data), len(toMigrate)) + } + } + }) + t.Run("InvalidMigrations", func(t *testing.T) { + if _, err := makeToMigrate(nil); err == nil { + t.Fatal("should error for nil migrations") + } + if _, err := makeToMigrate([]byte{}); err == nil { + t.Fatal("should error for empty migrations") + } + if _, err := makeToMigrate([]byte("invalid")); err == nil { + t.Fatal("should error for empty migrations") + } + }) + + t.Run("GetSeenCommits", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db := dbm.NewMemDB() + data := appendRandomMigrations([]toMigrate{}, 100) + insertTestData(t, db, data) + commits, err := getAllSeenCommits(ctx, db) + if err != nil { + t.Fatal(err) + } + if len(commits) != len(data) { + t.Log("inputs", len(data)) + t.Log("commits", len(commits)) + t.Fatal("migrations not found in database") + } + }) + t.Run("Integration", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db := dbm.NewMemDB() + data := appendRandomMigrations([]toMigrate{}, 1000) + insertTestData(t, db, data) + + latestHeight := getLatestHeight(data) + for _, test := range []string{"Migration", "Idempotency"} { + // run the test twice to make sure that it's + // safe to rerun + t.Run(test, func(t *testing.T) { + if err := Migrate(ctx, db); err != nil { + t.Fatal(err) + } + + post, err := getAllSeenCommits(ctx, db) + if err != nil { + t.Fatal(err) + } + if len(post) != 1 { + t.Fatal("migration was not successful") + } + if post[0].commit.Height != latestHeight { + t.Fatal("migration did not save correct document") + } + }) + } + }) + +}