mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-04 12:13:57 +00:00
This commit is contained in:
229
cmd/tendermint/commands/reindex_event.go
Normal file
229
cmd/tendermint/commands/reindex_event.go
Normal file
@@ -0,0 +1,229 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abcitypes "github.com/tendermint/tendermint/abci/types"
|
||||
tmcfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/progressbar"
|
||||
"github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink/psql"
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/state/txindex/kv"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
reindexFailed = "event re-index failed: "
|
||||
)
|
||||
|
||||
var (
|
||||
ErrHeightNotAvailable = errors.New("height is not available")
|
||||
ErrInvalidRequest = errors.New("invalid request")
|
||||
)
|
||||
|
||||
// ReIndexEventCmd constructs a command to re-index events in a block height interval.
|
||||
var ReIndexEventCmd = &cobra.Command{
|
||||
Use: "reindex-event",
|
||||
Short: "reindex events to the event store backends",
|
||||
Long: `
|
||||
reindex-event is an offline tooling to re-index block and tx events to the eventsinks,
|
||||
you can run this command when the event store backend dropped/disconnected or you want to
|
||||
replace the backend. The default start-height is 0, meaning the tooling will start
|
||||
reindex from the base block height(inclusive); and the default end-height is 0, meaning
|
||||
the tooling will reindex until the latest block height(inclusive). User can omit
|
||||
either or both arguments.
|
||||
`,
|
||||
Example: `
|
||||
tendermint reindex-event
|
||||
tendermint reindex-event --start-height 2
|
||||
tendermint reindex-event --end-height 10
|
||||
tendermint reindex-event --start-height 2 --end-height 10
|
||||
`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
bs, ss, err := loadStateAndBlockStore(config)
|
||||
if err != nil {
|
||||
fmt.Println(reindexFailed, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := checkValidHeight(bs); err != nil {
|
||||
fmt.Println(reindexFailed, err)
|
||||
return
|
||||
}
|
||||
|
||||
bi, ti, err := loadEventSinks(config)
|
||||
if err != nil {
|
||||
fmt.Println(reindexFailed, err)
|
||||
return
|
||||
}
|
||||
|
||||
riArgs := eventReIndexArgs{
|
||||
startHeight: startHeight,
|
||||
endHeight: endHeight,
|
||||
blockIndexer: bi,
|
||||
txIndexer: ti,
|
||||
blockStore: bs,
|
||||
stateStore: ss,
|
||||
}
|
||||
if err := eventReIndex(cmd, riArgs); err != nil {
|
||||
panic(fmt.Errorf("%s: %w", reindexFailed, err))
|
||||
}
|
||||
|
||||
fmt.Println("event re-index finished")
|
||||
},
|
||||
}
|
||||
|
||||
var (
|
||||
startHeight int64
|
||||
endHeight int64
|
||||
)
|
||||
|
||||
func init() {
|
||||
ReIndexEventCmd.Flags().Int64Var(&startHeight, "start-height", 0, "the block height would like to start for re-index")
|
||||
ReIndexEventCmd.Flags().Int64Var(&endHeight, "end-height", 0, "the block height would like to finish for re-index")
|
||||
}
|
||||
|
||||
func loadEventSinks(cfg *tmcfg.Config) (indexer.BlockIndexer, txindex.TxIndexer, error) {
|
||||
switch strings.ToLower(cfg.TxIndex.Indexer) {
|
||||
case "null":
|
||||
return nil, nil, errors.New("found null event sink, please check the tx-index section in the config.toml")
|
||||
case "psql":
|
||||
conn := cfg.TxIndex.PsqlConn
|
||||
if conn == "" {
|
||||
return nil, nil, errors.New("the psql connection settings cannot be empty")
|
||||
}
|
||||
es, err := psql.NewEventSink(conn, cfg.ChainID())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return es.BlockIndexer(), es.TxIndexer(), nil
|
||||
case "kv":
|
||||
store, err := dbm.NewDB("tx_index", dbm.BackendType(cfg.DBBackend), cfg.DBDir())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
txIndexer := kv.NewTxIndex(store)
|
||||
blockIndexer := blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events")))
|
||||
return blockIndexer, txIndexer, nil
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unsupported event sink type: %s", cfg.TxIndex.Indexer)
|
||||
}
|
||||
}
|
||||
|
||||
type eventReIndexArgs struct {
|
||||
startHeight int64
|
||||
endHeight int64
|
||||
blockIndexer indexer.BlockIndexer
|
||||
txIndexer txindex.TxIndexer
|
||||
blockStore state.BlockStore
|
||||
stateStore state.Store
|
||||
}
|
||||
|
||||
func eventReIndex(cmd *cobra.Command, args eventReIndexArgs) error {
|
||||
var bar progressbar.Bar
|
||||
bar.NewOption(args.startHeight-1, args.endHeight)
|
||||
|
||||
fmt.Println("start re-indexing events:")
|
||||
defer bar.Finish()
|
||||
for i := args.startHeight; i <= args.endHeight; i++ {
|
||||
select {
|
||||
case <-cmd.Context().Done():
|
||||
return fmt.Errorf("event re-index terminated at height %d: %w", i, cmd.Context().Err())
|
||||
default:
|
||||
b := args.blockStore.LoadBlock(i)
|
||||
if b == nil {
|
||||
return fmt.Errorf("not able to load block at height %d from the blockstore", i)
|
||||
}
|
||||
|
||||
r, err := args.stateStore.LoadABCIResponses(i)
|
||||
if err != nil {
|
||||
return fmt.Errorf("not able to load ABCI Response at height %d from the statestore", i)
|
||||
}
|
||||
|
||||
e := types.EventDataNewBlockHeader{
|
||||
Header: b.Header,
|
||||
NumTxs: int64(len(b.Txs)),
|
||||
ResultBeginBlock: *r.BeginBlock,
|
||||
ResultEndBlock: *r.EndBlock,
|
||||
}
|
||||
|
||||
var batch *txindex.Batch
|
||||
if e.NumTxs > 0 {
|
||||
batch = txindex.NewBatch(e.NumTxs)
|
||||
|
||||
for i := range b.Data.Txs {
|
||||
tr := abcitypes.TxResult{
|
||||
Height: b.Height,
|
||||
Index: uint32(i),
|
||||
Tx: b.Data.Txs[i],
|
||||
Result: *(r.DeliverTxs[i]),
|
||||
}
|
||||
|
||||
if err = batch.Add(&tr); err != nil {
|
||||
return fmt.Errorf("adding tx to batch: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := args.txIndexer.AddBatch(batch); err != nil {
|
||||
return fmt.Errorf("tx event re-index at height %d failed: %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := args.blockIndexer.Index(e); err != nil {
|
||||
return fmt.Errorf("block event re-index at height %d failed: %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
bar.Play(i)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkValidHeight(bs state.BlockStore) error {
|
||||
base := bs.Base()
|
||||
|
||||
if startHeight == 0 {
|
||||
startHeight = base
|
||||
fmt.Printf("set the start block height to the base height of the blockstore %d \n", base)
|
||||
}
|
||||
|
||||
if startHeight < base {
|
||||
return fmt.Errorf("%s (requested start height: %d, base height: %d)",
|
||||
ErrHeightNotAvailable, startHeight, base)
|
||||
}
|
||||
|
||||
height := bs.Height()
|
||||
|
||||
if startHeight > height {
|
||||
return fmt.Errorf(
|
||||
"%s (requested start height: %d, store height: %d)", ErrHeightNotAvailable, startHeight, height)
|
||||
}
|
||||
|
||||
if endHeight == 0 || endHeight > height {
|
||||
endHeight = height
|
||||
fmt.Printf("set the end block height to the latest height of the blockstore %d \n", height)
|
||||
}
|
||||
|
||||
if endHeight < base {
|
||||
return fmt.Errorf(
|
||||
"%s (requested end height: %d, base height: %d)", ErrHeightNotAvailable, endHeight, base)
|
||||
}
|
||||
|
||||
if endHeight < startHeight {
|
||||
return fmt.Errorf(
|
||||
"%s (requested the end height: %d is less than the start height: %d)",
|
||||
ErrInvalidRequest, startHeight, endHeight)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
192
cmd/tendermint/commands/reindex_event_test.go
Normal file
192
cmd/tendermint/commands/reindex_event_test.go
Normal file
@@ -0,0 +1,192 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
abcitypes "github.com/tendermint/tendermint/abci/types"
|
||||
tmcfg "github.com/tendermint/tendermint/config"
|
||||
prototmstate "github.com/tendermint/tendermint/proto/tendermint/state"
|
||||
blockmocks "github.com/tendermint/tendermint/state/indexer/mocks"
|
||||
"github.com/tendermint/tendermint/state/mocks"
|
||||
txmocks "github.com/tendermint/tendermint/state/txindex/mocks"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
)
|
||||
|
||||
const (
|
||||
height int64 = 10
|
||||
base int64 = 2
|
||||
)
|
||||
|
||||
func setupReIndexEventCmd() *cobra.Command {
|
||||
reIndexEventCmd := &cobra.Command{
|
||||
Use: ReIndexEventCmd.Use,
|
||||
Run: func(cmd *cobra.Command, args []string) {},
|
||||
}
|
||||
|
||||
_ = reIndexEventCmd.ExecuteContext(context.Background())
|
||||
|
||||
return reIndexEventCmd
|
||||
}
|
||||
|
||||
func TestReIndexEventCheckHeight(t *testing.T) {
|
||||
mockBlockStore := &mocks.BlockStore{}
|
||||
mockBlockStore.
|
||||
On("Base").Return(base).
|
||||
On("Height").Return(height)
|
||||
|
||||
testCases := []struct {
|
||||
startHeight int64
|
||||
endHeight int64
|
||||
validHeight bool
|
||||
}{
|
||||
{0, 0, true},
|
||||
{0, base, true},
|
||||
{0, base - 1, false},
|
||||
{0, height, true},
|
||||
{0, height + 1, true},
|
||||
{0, 0, true},
|
||||
{base - 1, 0, false},
|
||||
{base, 0, true},
|
||||
{base, base, true},
|
||||
{base, base - 1, false},
|
||||
{base, height, true},
|
||||
{base, height + 1, true},
|
||||
{height, 0, true},
|
||||
{height, base, false},
|
||||
{height, height - 1, false},
|
||||
{height, height, true},
|
||||
{height, height + 1, true},
|
||||
{height + 1, 0, false},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
startHeight = tc.startHeight
|
||||
endHeight = tc.endHeight
|
||||
|
||||
err := checkValidHeight(mockBlockStore)
|
||||
if tc.validHeight {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.Error(t, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadEventSink(t *testing.T) {
|
||||
testCases := []struct {
|
||||
sinks string
|
||||
connURL string
|
||||
loadErr bool
|
||||
}{
|
||||
{"", "", true},
|
||||
{"NULL", "", true},
|
||||
{"KV", "", false},
|
||||
{"PSQL", "", true}, // true because empty connect url
|
||||
// skip to test PSQL connect with correct url
|
||||
{"UnsupportedSinkType", "wrongUrl", true},
|
||||
}
|
||||
|
||||
for idx, tc := range testCases {
|
||||
cfg := tmcfg.TestConfig()
|
||||
cfg.TxIndex.Indexer = tc.sinks
|
||||
cfg.TxIndex.PsqlConn = tc.connURL
|
||||
_, _, err := loadEventSinks(cfg)
|
||||
if tc.loadErr {
|
||||
require.Error(t, err, idx)
|
||||
} else {
|
||||
require.NoError(t, err, idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadBlockStore(t *testing.T) {
|
||||
cfg := tmcfg.TestConfig()
|
||||
cfg.DBPath = t.TempDir()
|
||||
_, _, err := loadStateAndBlockStore(cfg)
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = dbm.NewDB("blockstore", dbm.GoLevelDBBackend, cfg.DBDir())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Get StateStore
|
||||
_, err = dbm.NewDB("state", dbm.GoLevelDBBackend, cfg.DBDir())
|
||||
require.NoError(t, err)
|
||||
|
||||
bs, ss, err := loadStateAndBlockStore(cfg)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, bs)
|
||||
require.NotNil(t, ss)
|
||||
|
||||
}
|
||||
|
||||
func TestReIndexEvent(t *testing.T) {
|
||||
mockBlockStore := &mocks.BlockStore{}
|
||||
mockStateStore := &mocks.Store{}
|
||||
mockBlockIndexer := &blockmocks.BlockIndexer{}
|
||||
mockTxIndexer := &txmocks.TxIndexer{}
|
||||
|
||||
mockBlockStore.
|
||||
On("Base").Return(base).
|
||||
On("Height").Return(height).
|
||||
On("LoadBlock", base).Return(nil).Once().
|
||||
On("LoadBlock", base).Return(&types.Block{Data: types.Data{Txs: types.Txs{make(types.Tx, 1)}}}).
|
||||
On("LoadBlock", height).Return(&types.Block{Data: types.Data{Txs: types.Txs{make(types.Tx, 1)}}})
|
||||
|
||||
dtx := abcitypes.ResponseDeliverTx{}
|
||||
abciResp := &prototmstate.ABCIResponses{
|
||||
DeliverTxs: []*abcitypes.ResponseDeliverTx{&dtx},
|
||||
EndBlock: &abcitypes.ResponseEndBlock{},
|
||||
BeginBlock: &abcitypes.ResponseBeginBlock{},
|
||||
}
|
||||
|
||||
mockBlockIndexer.
|
||||
On("Index", mock.AnythingOfType("types.EventDataNewBlockHeader")).Return(errors.New("")).Once().
|
||||
On("Index", mock.AnythingOfType("types.EventDataNewBlockHeader")).Return(nil)
|
||||
|
||||
mockTxIndexer.
|
||||
On("AddBatch", mock.AnythingOfType("*txindex.Batch")).Return(errors.New("")).Once().
|
||||
On("AddBatch", mock.AnythingOfType("*txindex.Batch")).Return(nil)
|
||||
|
||||
mockStateStore.
|
||||
On("LoadABCIResponses", base).Return(nil, errors.New("")).Once().
|
||||
On("LoadABCIResponses", base).Return(abciResp, nil).
|
||||
On("LoadABCIResponses", height).Return(abciResp, nil)
|
||||
|
||||
testCases := []struct {
|
||||
startHeight int64
|
||||
endHeight int64
|
||||
reIndexErr bool
|
||||
}{
|
||||
{base, height, true}, // LoadBlock error
|
||||
{base, height, true}, // LoadABCIResponses error
|
||||
{base, height, true}, // index block event error
|
||||
{base, height, true}, // index tx event error
|
||||
{base, base, false},
|
||||
{height, height, false},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
args := eventReIndexArgs{
|
||||
startHeight: tc.startHeight,
|
||||
endHeight: tc.endHeight,
|
||||
blockIndexer: mockBlockIndexer,
|
||||
txIndexer: mockTxIndexer,
|
||||
blockStore: mockBlockStore,
|
||||
stateStore: mockStateStore,
|
||||
}
|
||||
|
||||
err := eventReIndex(setupReIndexEventCmd(), args)
|
||||
if tc.reIndexErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,12 +2,14 @@ package commands
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/os"
|
||||
"github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
)
|
||||
@@ -55,6 +57,10 @@ func RollbackState(config *cfg.Config) (int64, []byte, error) {
|
||||
func loadStateAndBlockStore(config *cfg.Config) (*store.BlockStore, state.Store, error) {
|
||||
dbType := dbm.BackendType(config.DBBackend)
|
||||
|
||||
if !os.FileExists(filepath.Join(config.DBDir(), "blockstore.db")) {
|
||||
return nil, nil, fmt.Errorf("no blockstore found in %v", config.DBDir())
|
||||
}
|
||||
|
||||
// Get BlockStore
|
||||
blockStoreDB, err := dbm.NewDB("blockstore", dbType, config.DBDir())
|
||||
if err != nil {
|
||||
@@ -62,6 +68,10 @@ func loadStateAndBlockStore(config *cfg.Config) (*store.BlockStore, state.Store,
|
||||
}
|
||||
blockStore := store.NewBlockStore(blockStoreDB)
|
||||
|
||||
if !os.FileExists(filepath.Join(config.DBDir(), "state.db")) {
|
||||
return nil, nil, fmt.Errorf("no statestore found in %v", config.DBDir())
|
||||
}
|
||||
|
||||
// Get StateStore
|
||||
stateDB, err := dbm.NewDB("state", dbType, config.DBDir())
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user