From 0c2c0afaf8ed5380cd9992113fd4d84edae55f2b Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 16 Apr 2021 11:53:43 -0400 Subject: [PATCH] db: migration script for key format change (#6355) --- scripts/keymigrate/migrate.go | 402 +++++++++++++++++++++++++++++ scripts/keymigrate/migrate_test.go | 241 +++++++++++++++++ 2 files changed, 643 insertions(+) create mode 100644 scripts/keymigrate/migrate.go create mode 100644 scripts/keymigrate/migrate_test.go diff --git a/scripts/keymigrate/migrate.go b/scripts/keymigrate/migrate.go new file mode 100644 index 000000000..7833210a4 --- /dev/null +++ b/scripts/keymigrate/migrate.go @@ -0,0 +1,402 @@ +// Package keymigrate translates all legacy formatted keys to their +// new components. +// +// The key migration operation as implemented provides a potential +// model for database migration operations. Crucially, the migration +// as implemented does not depend on any tendermint code. +package keymigrate + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/hex" + "fmt" + "math/rand" + "runtime" + "strconv" + "sync" + + "github.com/google/orderedcode" + "github.com/pkg/errors" + dbm "github.com/tendermint/tm-db" +) + +type ( + keyID []byte + migrateFunc func(keyID) (keyID, error) +) + +func getAllLegacyKeys(db dbm.DB) ([]keyID, error) { + out := []keyID{} + + iter, err := db.Iterator(nil, nil) + if err != nil { + return nil, err + } + + for ; iter.Valid(); iter.Next() { + k := iter.Key() + + // make sure it's a key with a legacy format, and skip + // all other keys, to make it safe to resume the migration. + if !keyIsLegacy(k) { + continue + } + + // there's inconsistency around tm-db's handling of + // key copies. + nk := make([]byte, len(k)) + copy(nk, k) + out = append(out, nk) + } + + if err = iter.Error(); err != nil { + return nil, err + } + + if err = iter.Close(); err != nil { + return nil, err + } + + return out, nil +} + +func makeKeyChan(keys []keyID) <-chan keyID { + out := make(chan keyID, len(keys)) + defer close(out) + + for _, key := range keys { + out <- key + } + + return out +} + +func keyIsLegacy(key keyID) bool { + for _, prefix := range []keyID{ + // core "store" + keyID("consensusParamsKey:"), + keyID("abciResponsesKey:"), + keyID("validatorsKey:"), + keyID("stateKey"), + keyID("H:"), + keyID("P:"), + keyID("C:"), + keyID("SC:"), + keyID("BH:"), + // light + keyID("size"), + keyID("lb/"), + // evidence + keyID([]byte{0x00}), + keyID([]byte{0x01}), + // tx index + keyID("tx.height/"), + keyID("tx.hash/"), + } { + if bytes.HasPrefix(key, prefix) { + return true + } + } + + // this means it's a tx index... + if bytes.Count(key, []byte("/")) >= 3 { + return true + } + + return keyIsHash(key) +} + +func keyIsHash(key keyID) bool { + return len(key) == 32 && !bytes.Contains(key, []byte("/")) +} + +func migarateKey(key keyID) (keyID, error) { + switch { + case bytes.HasPrefix(key, keyID("H:")): + val, err := strconv.Atoi(string(key[2:])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, int64(0), int64(val)) + case bytes.HasPrefix(key, keyID("P:")): + parts := bytes.Split(key[2:], []byte(":")) + if len(parts) != 2 { + return nil, fmt.Errorf("block parts key has %d rather than 2 components", + len(parts)) + } + valOne, err := strconv.Atoi(string(parts[0])) + if err != nil { + return nil, err + } + + valTwo, err := strconv.Atoi(string(parts[1])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, int64(1), int64(valOne), int64(valTwo)) + case bytes.HasPrefix(key, keyID("C:")): + val, err := strconv.Atoi(string(key[2:])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, int64(2), int64(val)) + case bytes.HasPrefix(key, keyID("SC:")): + val, err := strconv.Atoi(string(key[3:])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, int64(3), int64(val)) + case bytes.HasPrefix(key, keyID("BH:")): + val, err := strconv.Atoi(string(key[3:])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, int64(4), int64(val)) + case bytes.HasPrefix(key, keyID("validatorsKey:")): + val, err := strconv.Atoi(string(key[14:])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, int64(5), int64(val)) + case bytes.HasPrefix(key, keyID("consensusParamsKey:")): + val, err := strconv.Atoi(string(key[19:])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, int64(6), int64(val)) + case bytes.HasPrefix(key, keyID("abciResponsesKey:")): + val, err := strconv.Atoi(string(key[17:])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, int64(7), int64(val)) + case bytes.HasPrefix(key, keyID("stateKey")): + return orderedcode.Append(nil, int64(8)) + case bytes.HasPrefix(key, []byte{0x00}): // committed evidence + return convertEvidence(key, 9) + case bytes.HasPrefix(key, []byte{0x01}): // pending evidence + return convertEvidence(key, 10) + case bytes.HasPrefix(key, keyID("lb/")): + if len(key) < 24 { + return nil, fmt.Errorf("light block evidence %q in invalid format", string(key)) + } + + val, err := strconv.Atoi(string(key[len(key)-20:])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, int64(11), int64(val)) + case bytes.HasPrefix(key, keyID("size")): + return orderedcode.Append(nil, int64(12)) + case bytes.HasPrefix(key, keyID("tx.height")): + parts := bytes.Split(key, []byte("/")) + if len(parts) != 4 { + return nil, fmt.Errorf("key has %d parts rather than 4", len(parts)) + } + parts = parts[1:] // drop prefix + + elems := make([]interface{}, 0, len(parts)+1) + elems = append(elems, "tx.height") + + for idx, pt := range parts { + val, err := strconv.Atoi(string(pt)) + if err != nil { + return nil, err + } + if idx == 0 { + elems = append(elems, fmt.Sprintf("%d", val)) + } else { + elems = append(elems, int64(val)) + } + } + + return orderedcode.Append(nil, elems...) + case bytes.Count(key, []byte("/")) >= 3: // tx indexer + parts := bytes.Split(key, []byte("/")) + + elems := make([]interface{}, 0, 4) + if len(parts) == 4 { + elems = append(elems, string(parts[0]), string(parts[1])) + + val, err := strconv.Atoi(string(parts[2])) + if err != nil { + return nil, err + } + elems = append(elems, int64(val)) + + val2, err := strconv.Atoi(string(parts[3])) + if err != nil { + return nil, err + } + elems = append(elems, int64(val2)) + } else { + elems = append(elems, string(parts[0])) + parts = parts[1:] + + val, err := strconv.Atoi(string(parts[len(parts)-1])) + if err != nil { + return nil, err + } + + val2, err := strconv.Atoi(string(parts[len(parts)-2])) + if err != nil { + return nil, err + } + + appKey := bytes.Join(parts[:len(parts)-3], []byte("/")) + elems = append(elems, string(appKey), int64(val), int64(val2)) + } + return orderedcode.Append(nil, elems...) + case keyIsHash(key): + return orderedcode.Append(nil, "tx.hash", string(key)) + default: + return nil, fmt.Errorf("key %q is in the wrong format", string(key)) + } +} + +func convertEvidence(key keyID, newPrefix int64) ([]byte, error) { + parts := bytes.Split(key[1:], []byte("/")) + if len(parts) != 2 { + return nil, fmt.Errorf("evidence key is malformed with %d parts not 2", + len(parts)) + } + + hb, err := hex.DecodeString(string(parts[0])) + if err != nil { + return nil, err + } + + evidenceHash, err := hex.DecodeString(string(parts[1])) + if err != nil { + return nil, err + } + + return orderedcode.Append(nil, newPrefix, binary.BigEndian.Uint64(hb), string(evidenceHash)) +} + +func replaceKey(db dbm.DB, key keyID, gooseFn migrateFunc) error { + exists, err := db.Has(key) + if err != nil { + return err + } + if !exists { + return nil + } + + newKey, err := gooseFn(key) + if err != nil { + return err + } + + val, err := db.Get(key) + if err != nil { + return err + } + + batch := db.NewBatch() + + if err = batch.Set(newKey, val); err != nil { + return err + } + if err = batch.Delete(key); err != nil { + return err + } + + // 10% of the time, force a write to disk, but mostly don't, + // because it's faster. + if rand.Intn(100)%10 == 0 { // nolint:gosec + if err = batch.WriteSync(); err != nil { + return err + } + } else { + if err = batch.Write(); err != nil { + return err + } + } + + if err = batch.Close(); err != nil { + return err + } + + return nil +} + +// Migrate converts all legacy key formats to new key formats. The +// operation is idempotent, so it's safe to resume a failed +// operation. The operation is somewhat parallelized, relying on the +// concurrency safety of the underlying databases. +// +// Migrate has "continue on error" semantics and will iterate through +// all legacy keys attempt to migrate them, and will collect all +// errors and will return only at the end of the operation. +// +// The context allows for a safe termination of the operation +// (e.g connected to a singal handler,) to abort the operation +// in-between migration operations. +func Migrate(ctx context.Context, db dbm.DB) error { + keys, err := getAllLegacyKeys(db) + if err != nil { + return err + } + + numWorkers := runtime.NumCPU() + wg := &sync.WaitGroup{} + + errs := make(chan error, numWorkers) + + keyCh := makeKeyChan(keys) + + // run migrations. + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for key := range keyCh { + err := replaceKey(db, key, migarateKey) + if err != nil { + errs <- err + } + + if ctx.Err() != nil { + return + } + } + }() + } + + // collect and process the errors. + errStrs := []string{} + signal := make(chan struct{}) + go func() { + defer close(signal) + for err := range errs { + if err == nil { + continue + } + errStrs = append(errStrs, err.Error()) + } + }() + + // Wait for everything to be done. + wg.Wait() + close(errs) + <-signal + + // check the error results + if len(errs) != 0 { + return errors.Errorf("encountered errors during migration: %v", errStrs) + } + + return nil +} diff --git a/scripts/keymigrate/migrate_test.go b/scripts/keymigrate/migrate_test.go new file mode 100644 index 000000000..21e9592fb --- /dev/null +++ b/scripts/keymigrate/migrate_test.go @@ -0,0 +1,241 @@ +package keymigrate + +import ( + "bytes" + "context" + "errors" + "fmt" + "math" + "testing" + + "github.com/google/orderedcode" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" +) + +func makeKey(t *testing.T, elems ...interface{}) []byte { + t.Helper() + out, err := orderedcode.Append([]byte{}, elems...) + require.NoError(t, err) + return out +} + +func getLegacyPrefixKeys(val int) map[string][]byte { + return map[string][]byte{ + "Height": []byte(fmt.Sprintf("H:%d", val)), + "BlockPart": []byte(fmt.Sprintf("P:%d:%d", val, val)), + "BlockPartTwo": []byte(fmt.Sprintf("P:%d:%d", val+2, val+val)), + "BlockCommit": []byte(fmt.Sprintf("C:%d", val)), + "SeenCommit": []byte(fmt.Sprintf("SC:%d", val)), + "BlockHeight": []byte(fmt.Sprintf("BH:%d", val)), + "Validators": []byte(fmt.Sprintf("validatorsKey:%d", val)), + "ConsensusParams": []byte(fmt.Sprintf("consensusParamsKey:%d", val)), + "ABCIResponse": []byte(fmt.Sprintf("abciResponsesKey:%d", val)), + "State": []byte("stateKey"), + "CommittedEvidence": append([]byte{0x00}, []byte(fmt.Sprintf("%0.16X/%X", int64(val), []byte("committed")))...), + "PendingEvidence": append([]byte{0x01}, []byte(fmt.Sprintf("%0.16X/%X", int64(val), []byte("pending")))...), + "LightBLock": []byte(fmt.Sprintf("lb/foo/%020d", val)), + "Size": []byte("size"), + "UserKey0": []byte(fmt.Sprintf("foo/bar/%d/%d", val, val)), + "UserKey1": []byte(fmt.Sprintf("foo/bar/baz/%d/%d", val, val)), + "TxHeight": []byte(fmt.Sprintf("tx.height/%s/%d/%d", fmt.Sprint(val), val, val)), + "TxHash": append( + bytes.Repeat([]byte{fmt.Sprint(val)[0]}, 16), + bytes.Repeat([]byte{fmt.Sprint(val)[len([]byte(fmt.Sprint(val)))-1]}, 16)..., + ), + } +} + +func getNewPrefixKeys(t *testing.T, val int) map[string][]byte { + t.Helper() + return map[string][]byte{ + "Height": makeKey(t, int64(0), int64(val)), + "BlockPart": makeKey(t, int64(1), int64(val), int64(val)), + "BlockPartTwo": makeKey(t, int64(1), int64(val+2), int64(val+val)), + "BlockCommit": makeKey(t, int64(2), int64(val)), + "SeenCommit": makeKey(t, int64(3), int64(val)), + "BlockHeight": makeKey(t, int64(4), int64(val)), + "Validators": makeKey(t, int64(5), int64(val)), + "ConsensusParams": makeKey(t, int64(6), int64(val)), + "ABCIResponse": makeKey(t, int64(7), int64(val)), + "State": makeKey(t, int64(8)), + "CommittedEvidence": makeKey(t, int64(9), int64(val)), + "PendingEvidence": makeKey(t, int64(10), int64(val)), + "LightBLock": makeKey(t, int64(11), int64(val)), + "Size": makeKey(t, int64(12)), + "UserKey0": makeKey(t, "foo", "bar", int64(val), int64(val)), + "UserKey1": makeKey(t, "foo", "bar/baz", int64(val), int64(val)), + "TxHeight": makeKey(t, "tx.height", fmt.Sprint(val), int64(val), int64(val+2), int64(val+val)), + "TxHash": makeKey(t, "tx.hash", string(bytes.Repeat([]byte{[]byte(fmt.Sprint(val))[0]}, 32))), + } +} + +func getLegacyDatabase(t *testing.T) (int, dbm.DB) { + db := dbm.NewMemDB() + batch := db.NewBatch() + ct := 0 + + generated := []map[string][]byte{ + getLegacyPrefixKeys(8), + getLegacyPrefixKeys(9001), + getLegacyPrefixKeys(math.MaxInt32 << 1), + getLegacyPrefixKeys(math.MaxInt64 - 8), + } + + // populate database + for _, km := range generated { + for _, key := range km { + ct++ + require.NoError(t, batch.Set(key, []byte(fmt.Sprintf(`{"value": %d}`, ct)))) + } + } + require.NoError(t, batch.WriteSync()) + require.NoError(t, batch.Close()) + return ct - (2 * len(generated)) + 2, db +} + +func TestMigration(t *testing.T) { + t.Run("Idempotency", func(t *testing.T) { + // we want to make sure that the key space for new and + // legacy keys are entirely non-overlapping. + + legacyPrefixes := getLegacyPrefixKeys(42) + + newPrefixes := getNewPrefixKeys(t, 42) + + require.Equal(t, len(legacyPrefixes), len(newPrefixes)) + + t.Run("Legacy", func(t *testing.T) { + for kind, le := range legacyPrefixes { + require.True(t, keyIsLegacy(le), kind) + } + }) + t.Run("New", func(t *testing.T) { + for kind, ne := range newPrefixes { + require.False(t, keyIsLegacy(ne), kind) + } + }) + t.Run("Conversion", func(t *testing.T) { + for kind, le := range legacyPrefixes { + nk, err := migarateKey(le) + require.NoError(t, err, kind) + require.False(t, keyIsLegacy(nk), kind) + } + }) + t.Run("Hashes", func(t *testing.T) { + t.Run("NewKeysAreNotHashes", func(t *testing.T) { + for _, key := range getNewPrefixKeys(t, 9001) { + require.True(t, len(key) != 32) + } + }) + t.Run("ContrivedLegacyKeyDetection", func(t *testing.T) { + require.True(t, keyIsLegacy([]byte("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"))) + require.False(t, keyIsLegacy([]byte("xxxxxxxxxxxxxxx/xxxxxxxxxxxxxxxx"))) + }) + }) + }) + t.Run("Migrations", func(t *testing.T) { + t.Run("Errors", func(t *testing.T) { + table := map[string][]byte{ + "Height": []byte(fmt.Sprintf("H:%f", 4.22222)), + "BlockPart": []byte(fmt.Sprintf("P:%f", 4.22222)), + "BlockPartTwo": []byte(fmt.Sprintf("P:%d", 42)), + "BlockPartThree": []byte(fmt.Sprintf("P:%f:%f", 4.222, 8.444)), + "BlockPartFour": []byte(fmt.Sprintf("P:%d:%f", 4222, 8.444)), + "BlockCommit": []byte(fmt.Sprintf("C:%f", 4.22222)), + "SeenCommit": []byte(fmt.Sprintf("SC:%f", 4.22222)), + "BlockHeight": []byte(fmt.Sprintf("BH:%f", 4.22222)), + "Validators": []byte(fmt.Sprintf("validatorsKey:%f", 4.22222)), + "ConsensusParams": []byte(fmt.Sprintf("consensusParamsKey:%f", 4.22222)), + "ABCIResponse": []byte(fmt.Sprintf("abciResponsesKey:%f", 4.22222)), + "LightBlockShort": []byte(fmt.Sprintf("lb/foo/%010d", 42)), + "LightBlockLong": []byte("lb/foo/12345678910.1234567890"), + "Invalid": {0x03}, + "BadTXHeight0": []byte(fmt.Sprintf("tx.height/%s/%f/%f", "boop", 4.4, 4.5)), + "BadTXHeight1": []byte(fmt.Sprintf("tx.height/%s/%f", "boop", 4.4)), + "UserKey0": []byte("foo/bar/1.3/3.4"), + "UserKey1": []byte("foo/bar/1/3.4"), + "UserKey2": []byte("foo/bar/baz/1/3.4"), + "UserKey3": []byte("foo/bar/baz/1.2/4"), + } + for kind, key := range table { + out, err := migarateKey(key) + require.Error(t, err, kind) + require.Nil(t, out, kind) + } + }) + t.Run("Replacement", func(t *testing.T) { + t.Run("MissingKey", func(t *testing.T) { + db := dbm.NewMemDB() + require.NoError(t, replaceKey(db, keyID("hi"), nil)) + }) + t.Run("ReplacementFails", func(t *testing.T) { + db := dbm.NewMemDB() + key := keyID("hi") + require.NoError(t, db.Set(key, []byte("world"))) + require.Error(t, replaceKey(db, key, func(k keyID) (keyID, error) { + return nil, errors.New("hi") + })) + }) + t.Run("KeyDisapears", func(t *testing.T) { + db := dbm.NewMemDB() + key := keyID("hi") + require.NoError(t, db.Set(key, []byte("world"))) + require.Error(t, replaceKey(db, key, func(k keyID) (keyID, error) { + require.NoError(t, db.Delete(key)) + return keyID("wat"), nil + })) + + exists, err := db.Has(key) + require.NoError(t, err) + require.False(t, exists) + + exists, err = db.Has(keyID("wat")) + require.NoError(t, err) + require.False(t, exists) + }) + }) + }) + t.Run("Integration", func(t *testing.T) { + t.Run("KeyDiscovery", func(t *testing.T) { + size, db := getLegacyDatabase(t) + keys, err := getAllLegacyKeys(db) + require.NoError(t, err) + require.Equal(t, size, len(keys)) + legacyKeys := 0 + for _, k := range keys { + if keyIsLegacy(k) { + legacyKeys++ + } + } + require.Equal(t, size, legacyKeys) + }) + t.Run("KeyIdempotency", func(t *testing.T) { + for _, key := range getNewPrefixKeys(t, 84) { + require.False(t, keyIsLegacy(key)) + } + }) + t.Run("ChannelConversion", func(t *testing.T) { + ch := makeKeyChan([]keyID{ + makeKey(t, "abc", int64(2), int64(42)), + makeKey(t, int64(42)), + }) + count := 0 + for range ch { + count++ + } + require.Equal(t, 2, count) + }) + t.Run("Migrate", func(t *testing.T) { + _, db := getLegacyDatabase(t) + + ctx := context.Background() + err := Migrate(ctx, db) + require.NoError(t, err) + keys, err := getAllLegacyKeys(db) + require.NoError(t, err) + require.Equal(t, 0, len(keys)) + + }) + }) +}