state: define interface for state store (#5348)

## Description

Make an interface for the state store. 

Closes: #5213
This commit is contained in:
Marko
2020-09-15 09:45:48 +02:00
committed by GitHub
parent bf6bd2cefa
commit 56911ee352
37 changed files with 495 additions and 327 deletions

View File

@@ -1,6 +1,7 @@
package state
import (
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
@@ -36,10 +37,52 @@ func calcABCIResponsesKey(height int64) []byte {
return []byte(fmt.Sprintf("abciResponsesKey:%v", height))
}
//----------------------
type Store interface {
// LoadFromDBOrGenesisFile loads the most recent state.
// If the chain is new it will use the genesis file from the provided genesis file path as the current state.
LoadFromDBOrGenesisFile(string) (State, error)
// LoadFromDBOrGenesisDoc loads the most recent state.
// If the chain is new it will use the genesis doc as the current state.
LoadFromDBOrGenesisDoc(*types.GenesisDoc) (State, error)
// Load loads the current state of the blockchain
Load() (State, error)
// LoadValidators loads the validator set at a given height
LoadValidators(int64) (*types.ValidatorSet, error)
// LoadABCIResponses loads the abciResponse for a given height
LoadABCIResponses(int64) (*tmstate.ABCIResponses, error)
// LoadConsensusParams loads the consensus params for a given height
LoadConsensusParams(int64) (tmproto.ConsensusParams, error)
// Save overwrites the previous state with the updated one
Save(State) error
// SaveABCIResponses saves ABCIResponses for a given height
SaveABCIResponses(int64, *tmstate.ABCIResponses) error
// Bootstrap is used for bootstrapping state when not starting from a initial height.
Bootstrap(State) error
// PruneStates takes the height from which to start prning and which height stop at
PruneStates(int64, int64) error
}
//dbStore wraps a db (github.com/tendermint/tm-db)
type dbStore struct {
db dbm.DB
}
var _ Store = (*dbStore)(nil)
// NewStore creates the dbStore of the state pkg.
func NewStore(db dbm.DB) Store {
return dbStore{db}
}
// LoadStateFromDBOrGenesisFile loads the most recent state from the database,
// or creates a new one from the given genesisFilePath.
func LoadStateFromDBOrGenesisFile(stateDB dbm.DB, genesisFilePath string) (State, error) {
state := LoadState(stateDB)
func (store dbStore) LoadFromDBOrGenesisFile(genesisFilePath string) (State, error) {
state, err := store.Load()
if err != nil {
return State{}, err
}
if state.IsEmpty() {
var err error
state, err = MakeGenesisStateFromFile(genesisFilePath)
@@ -53,8 +96,11 @@ func LoadStateFromDBOrGenesisFile(stateDB dbm.DB, genesisFilePath string) (State
// LoadStateFromDBOrGenesisDoc loads the most recent state from the database,
// or creates a new one from the given genesisDoc.
func LoadStateFromDBOrGenesisDoc(stateDB dbm.DB, genesisDoc *types.GenesisDoc) (State, error) {
state := LoadState(stateDB)
func (store dbStore) LoadFromDBOrGenesisDoc(genesisDoc *types.GenesisDoc) (State, error) {
state, err := store.Load()
if err != nil {
return State{}, err
}
if state.IsEmpty() {
var err error
@@ -68,17 +114,17 @@ func LoadStateFromDBOrGenesisDoc(stateDB dbm.DB, genesisDoc *types.GenesisDoc) (
}
// LoadState loads the State from the database.
func LoadState(db dbm.DB) State {
return loadState(db, stateKey)
func (store dbStore) Load() (State, error) {
return store.loadState(stateKey)
}
func loadState(db dbm.DB, key []byte) (state State) {
buf, err := db.Get(key)
func (store dbStore) loadState(key []byte) (state State, err error) {
buf, err := store.db.Get(key)
if err != nil {
panic(err)
return state, err
}
if len(buf) == 0 {
return state
return state, nil
}
sp := new(tmstate.State)
@@ -92,51 +138,72 @@ func loadState(db dbm.DB, key []byte) (state State) {
sm, err := StateFromProto(sp)
if err != nil {
panic(err)
return state, err
}
return *sm
return *sm, nil
}
// SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database.
// Save persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database.
// This flushes the writes (e.g. calls SetSync).
func SaveState(db dbm.DB, state State) {
saveState(db, state, stateKey)
func (store dbStore) Save(state State) error {
return store.save(state, stateKey)
}
func saveState(db dbm.DB, state State, key []byte) {
func (store dbStore) save(state State, key []byte) error {
nextHeight := state.LastBlockHeight + 1
// If first block, save validators for the block.
if nextHeight == 1 {
nextHeight = state.InitialHeight
// This extra logic due to Tendermint validator set changes being delayed 1 block.
// It may get overwritten due to InitChain validator updates.
saveValidatorsInfo(db, nextHeight, nextHeight, state.Validators)
if err := store.saveValidatorsInfo(nextHeight, nextHeight, state.Validators); err != nil {
return err
}
}
// Save next validators.
saveValidatorsInfo(db, nextHeight+1, state.LastHeightValidatorsChanged, state.NextValidators)
if err := store.saveValidatorsInfo(nextHeight+1, state.LastHeightValidatorsChanged, state.NextValidators); err != nil {
return err
}
// Save next consensus params.
saveConsensusParamsInfo(db, nextHeight, state.LastHeightConsensusParamsChanged, state.ConsensusParams)
err := db.SetSync(key, state.Bytes())
if err != nil {
panic(err)
if err := store.saveConsensusParamsInfo(nextHeight,
state.LastHeightConsensusParamsChanged, state.ConsensusParams); err != nil {
return err
}
err := store.db.SetSync(key, state.Bytes())
if err != nil {
return err
}
return nil
}
// BootstrapState saves a new state, used e.g. by state sync when starting from non-zero height.
func BootstrapState(db dbm.DB, state State) error {
func (store dbStore) Bootstrap(state State) error {
height := state.LastBlockHeight + 1
if height == 1 {
height = state.InitialHeight
}
if height > 1 && !state.LastValidators.IsNilOrEmpty() {
saveValidatorsInfo(db, height-1, height-1, state.LastValidators)
if err := store.saveValidatorsInfo(height-1, height-1, state.LastValidators); err != nil {
return err
}
}
saveValidatorsInfo(db, height, height, state.Validators)
saveValidatorsInfo(db, height+1, height+1, state.NextValidators)
saveConsensusParamsInfo(db, height, height, state.ConsensusParams)
return db.SetSync(stateKey, state.Bytes())
if err := store.saveValidatorsInfo(height, height, state.Validators); err != nil {
return err
}
if err := store.saveValidatorsInfo(height+1, height+1, state.NextValidators); err != nil {
return err
}
if err := store.saveConsensusParamsInfo(height, height, state.ConsensusParams); err != nil {
return err
}
return store.db.SetSync(stateKey, state.Bytes())
}
// PruneStates deletes states between the given heights (including from, excluding to). It is not
@@ -147,20 +214,20 @@ func BootstrapState(db dbm.DB, state State) error {
// encoding not preserving ordering: https://github.com/tendermint/tendermint/issues/4567
// This will cause some old states to be left behind when doing incremental partial prunes,
// specifically older checkpoints and LastHeightChanged targets.
func PruneStates(db dbm.DB, from int64, to int64) error {
func (store dbStore) PruneStates(from int64, to int64) error {
if from <= 0 || to <= 0 {
return fmt.Errorf("from height %v and to height %v must be greater than 0", from, to)
}
if from >= to {
return fmt.Errorf("from height %v must be lower than to height %v", from, to)
}
valInfo := loadValidatorsInfo(db, to)
if valInfo == nil {
return fmt.Errorf("validators at height %v not found", to)
valInfo, err := loadValidatorsInfo(store.db, to)
if err != nil {
return fmt.Errorf("validators at height %v not found: %w", to, err)
}
paramsInfo := loadConsensusParamsInfo(db, to)
if paramsInfo == nil {
return fmt.Errorf("consensus params at height %v not found", to)
paramsInfo, err := store.loadConsensusParamsInfo(to)
if err != nil {
return fmt.Errorf("consensus params at height %v not found: %w", to, err)
}
keepVals := make(map[int64]bool)
@@ -173,10 +240,9 @@ func PruneStates(db dbm.DB, from int64, to int64) error {
keepParams[paramsInfo.LastHeightChanged] = true
}
batch := db.NewBatch()
batch := store.db.NewBatch()
defer batch.Close()
pruned := uint64(0)
var err error
// We have to delete in reverse order, to avoid deleting previous heights that have validator
// sets and consensus params that we may need to retrieve.
@@ -185,10 +251,9 @@ func PruneStates(db dbm.DB, from int64, to int64) error {
// params, otherwise they will panic if they're retrieved directly (instead of
// indirectly via a LastHeightChanged pointer).
if keepVals[h] {
v := loadValidatorsInfo(db, h)
if v.ValidatorSet == nil {
vip, err := LoadValidators(db, h)
v, err := loadValidatorsInfo(store.db, h)
if err != nil || v.ValidatorSet == nil {
vip, err := store.LoadValidators(h)
if err != nil {
return err
}
@@ -218,17 +283,23 @@ func PruneStates(db dbm.DB, from int64, to int64) error {
}
if keepParams[h] {
p := loadConsensusParamsInfo(db, h)
p, err := store.loadConsensusParamsInfo(h)
if err != nil {
return err
}
if p.ConsensusParams.Equal(&tmproto.ConsensusParams{}) {
p.ConsensusParams, err = LoadConsensusParams(db, h)
p.ConsensusParams, err = store.LoadConsensusParams(h)
if err != nil {
return err
}
p.LastHeightChanged = h
bz, err := p.Marshal()
if err != nil {
return err
}
err = batch.Set(calcConsensusParamsKey(h), bz)
if err != nil {
return err
@@ -254,7 +325,7 @@ func PruneStates(db dbm.DB, from int64, to int64) error {
return err
}
batch.Close()
batch = db.NewBatch()
batch = store.db.NewBatch()
defer batch.Close()
}
}
@@ -283,8 +354,8 @@ func ABCIResponsesResultsHash(ar *tmstate.ABCIResponses) []byte {
// This is useful for recovering from crashes where we called app.Commit and
// before we called s.Save(). It can also be used to produce Merkle proofs of
// the result of txs.
func LoadABCIResponses(db dbm.DB, height int64) (*tmstate.ABCIResponses, error) {
buf, err := db.Get(calcABCIResponsesKey(height))
func (store dbStore) LoadABCIResponses(height int64) (*tmstate.ABCIResponses, error) {
buf, err := store.db.Get(calcABCIResponsesKey(height))
if err != nil {
return nil, err
}
@@ -311,7 +382,7 @@ func LoadABCIResponses(db dbm.DB, height int64) (*tmstate.ABCIResponses, error)
// Merkle proofs.
//
// Exposed for testing.
func SaveABCIResponses(db dbm.DB, height int64, abciResponses *tmstate.ABCIResponses) {
func (store dbStore) SaveABCIResponses(height int64, abciResponses *tmstate.ABCIResponses) error {
var dtxs []*abci.ResponseDeliverTx
//strip nil values,
for _, tx := range abciResponses.DeliverTxs {
@@ -323,33 +394,36 @@ func SaveABCIResponses(db dbm.DB, height int64, abciResponses *tmstate.ABCIRespo
bz, err := abciResponses.Marshal()
if err != nil {
panic(err)
return err
}
err = db.SetSync(calcABCIResponsesKey(height), bz)
err = store.db.SetSync(calcABCIResponsesKey(height), bz)
if err != nil {
panic(err)
return err
}
return nil
}
//-----------------------------------------------------------------------------
// LoadValidators loads the ValidatorSet for a given height.
// Returns ErrNoValSetForHeight if the validator set can't be found for this height.
func LoadValidators(db dbm.DB, height int64) (*types.ValidatorSet, error) {
valInfo := loadValidatorsInfo(db, height)
if valInfo == nil {
func (store dbStore) LoadValidators(height int64) (*types.ValidatorSet, error) {
valInfo, err := loadValidatorsInfo(store.db, height)
if err != nil {
return nil, ErrNoValSetForHeight{height}
}
if valInfo.ValidatorSet == nil {
lastStoredHeight := lastStoredHeightFor(height, valInfo.LastHeightChanged)
valInfo2 := loadValidatorsInfo(db, lastStoredHeight)
if valInfo2 == nil || valInfo2.ValidatorSet == nil {
panic(
fmt.Sprintf("Couldn't find validators at height %d (height %d was originally requested)",
valInfo2, err := loadValidatorsInfo(store.db, lastStoredHeight)
if err != nil || valInfo2.ValidatorSet == nil {
return nil,
fmt.Errorf("couldn't find validators at height %d (height %d was originally requested): %w",
lastStoredHeight,
height,
),
)
err,
)
}
vs, err := types.ValidatorSetFromProto(valInfo2.ValidatorSet)
@@ -381,14 +455,14 @@ func lastStoredHeightFor(height, lastHeightChanged int64) int64 {
}
// CONTRACT: Returned ValidatorsInfo can be mutated.
func loadValidatorsInfo(db dbm.DB, height int64) *tmstate.ValidatorsInfo {
func loadValidatorsInfo(db dbm.DB, height int64) (*tmstate.ValidatorsInfo, error) {
buf, err := db.Get(calcValidatorsKey(height))
if err != nil {
panic(err)
return nil, err
}
if len(buf) == 0 {
return nil
return nil, errors.New("value retrieved from db is empty")
}
v := new(tmstate.ValidatorsInfo)
@@ -400,7 +474,7 @@ func loadValidatorsInfo(db dbm.DB, height int64) *tmstate.ValidatorsInfo {
}
// TODO: ensure that buf is completely read.
return v
return v, nil
}
// saveValidatorsInfo persists the validator set.
@@ -408,9 +482,9 @@ func loadValidatorsInfo(db dbm.DB, height int64) *tmstate.ValidatorsInfo {
// `height` is the effective height for which the validator is responsible for
// signing. It should be called from s.Save(), right before the state itself is
// persisted.
func saveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *types.ValidatorSet) {
func (store dbStore) saveValidatorsInfo(height, lastHeightChanged int64, valSet *types.ValidatorSet) error {
if lastHeightChanged > height {
panic("LastHeightChanged cannot be greater than ValidatorsInfo height")
return errors.New("lastHeightChanged cannot be greater than ValidatorsInfo height")
}
valInfo := &tmstate.ValidatorsInfo{
LastHeightChanged: lastHeightChanged,
@@ -420,20 +494,22 @@ func saveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *type
if height == lastHeightChanged || height%valSetCheckpointInterval == 0 {
pv, err := valSet.ToProto()
if err != nil {
panic(err)
return err
}
valInfo.ValidatorSet = pv
}
bz, err := valInfo.Marshal()
if err != nil {
panic(err)
return err
}
err = db.Set(calcValidatorsKey(height), bz)
err = store.db.Set(calcValidatorsKey(height), bz)
if err != nil {
panic(err)
return err
}
return nil
}
//-----------------------------------------------------------------------------
@@ -441,23 +517,22 @@ func saveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *type
// ConsensusParamsInfo represents the latest consensus params, or the last height it changed
// LoadConsensusParams loads the ConsensusParams for a given height.
func LoadConsensusParams(db dbm.DB, height int64) (tmproto.ConsensusParams, error) {
func (store dbStore) LoadConsensusParams(height int64) (tmproto.ConsensusParams, error) {
empty := tmproto.ConsensusParams{}
paramsInfo := loadConsensusParamsInfo(db, height)
if paramsInfo == nil {
return empty, ErrNoConsensusParamsForHeight{height}
paramsInfo, err := store.loadConsensusParamsInfo(height)
if err != nil {
return empty, fmt.Errorf("could not find consensus params for height #%d: %w", height, err)
}
if paramsInfo.ConsensusParams.Equal(&empty) {
paramsInfo2 := loadConsensusParamsInfo(db, paramsInfo.LastHeightChanged)
if paramsInfo2 == nil {
panic(
fmt.Sprintf(
"Couldn't find consensus params at height %d as last changed from height %d",
paramsInfo.LastHeightChanged,
height,
),
paramsInfo2, err := store.loadConsensusParamsInfo(paramsInfo.LastHeightChanged)
if err != nil {
return empty, fmt.Errorf(
"couldn't find consensus params at height %d as last changed from height %d: %w",
paramsInfo.LastHeightChanged,
height,
err,
)
}
@@ -467,13 +542,13 @@ func LoadConsensusParams(db dbm.DB, height int64) (tmproto.ConsensusParams, erro
return paramsInfo.ConsensusParams, nil
}
func loadConsensusParamsInfo(db dbm.DB, height int64) *tmstate.ConsensusParamsInfo {
buf, err := db.Get(calcConsensusParamsKey(height))
func (store dbStore) loadConsensusParamsInfo(height int64) (*tmstate.ConsensusParamsInfo, error) {
buf, err := store.db.Get(calcConsensusParamsKey(height))
if err != nil {
panic(err)
return nil, err
}
if len(buf) == 0 {
return nil
return nil, errors.New("value retrieved from db is empty")
}
paramsInfo := new(tmstate.ConsensusParamsInfo)
@@ -484,14 +559,14 @@ func loadConsensusParamsInfo(db dbm.DB, height int64) *tmstate.ConsensusParamsIn
}
// TODO: ensure that buf is completely read.
return paramsInfo
return paramsInfo, nil
}
// saveConsensusParamsInfo persists the consensus params for the next block to disk.
// It should be called from s.Save(), right before the state itself is persisted.
// If the consensus params did not change after processing the latest block,
// only the last height for which they changed is persisted.
func saveConsensusParamsInfo(db dbm.DB, nextHeight, changeHeight int64, params tmproto.ConsensusParams) {
func (store dbStore) saveConsensusParamsInfo(nextHeight, changeHeight int64, params tmproto.ConsensusParams) error {
paramsInfo := &tmstate.ConsensusParamsInfo{
LastHeightChanged: changeHeight,
}
@@ -501,11 +576,13 @@ func saveConsensusParamsInfo(db dbm.DB, nextHeight, changeHeight int64, params t
}
bz, err := paramsInfo.Marshal()
if err != nil {
panic(err)
return err
}
err = db.Set(calcConsensusParamsKey(nextHeight), bz)
err = store.db.Set(calcConsensusParamsKey(nextHeight), bz)
if err != nil {
panic(err)
return err
}
return nil
}