state: move pruneBlocks from consensus/state to state/execution (#9443)

This commit is contained in:
JayT106
2022-09-20 05:30:22 -04:00
committed by GitHub
parent a8efef1854
commit db26cff58f
17 changed files with 123 additions and 83 deletions

View File

@@ -25,6 +25,9 @@ type BlockExecutor struct {
// save state, validators, consensus params, abci responses here
store Store
// use blockstore for the pruning functions.
blockStore BlockStore
// execute the app against this
proxyApp proxy.AppConnConsensus
@@ -57,16 +60,18 @@ func NewBlockExecutor(
proxyApp proxy.AppConnConsensus,
mempool mempool.Mempool,
evpool EvidencePool,
blockStore BlockStore,
options ...BlockExecutorOption,
) *BlockExecutor {
res := &BlockExecutor{
store: stateStore,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
store: stateStore,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
blockStore: blockStore,
}
for _, option := range options {
@@ -182,16 +187,16 @@ func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) e
// ApplyBlock validates the block against the state, executes it against the app,
// fires the relevant events, commits the app, and saves the new state and responses.
// It returns the new state and the block height to retain (pruning older blocks).
// It returns the new state.
// It's the only function that needs to be called
// from outside this package to process and commit an entire block.
// It takes a blockID to avoid recomputing the parts hash.
func (blockExec *BlockExecutor) ApplyBlock(
state State, blockID types.BlockID, block *types.Block,
) (State, int64, error) {
) (State, error) {
if err := validateBlock(state, block); err != nil {
return state, 0, ErrInvalidBlock(err)
return state, ErrInvalidBlock(err)
}
startTime := time.Now().UnixNano()
@@ -201,14 +206,14 @@ func (blockExec *BlockExecutor) ApplyBlock(
endTime := time.Now().UnixNano()
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
if err != nil {
return state, 0, ErrProxyAppConn(err)
return state, ErrProxyAppConn(err)
}
fail.Fail() // XXX
// Save the results before we commit.
if err := blockExec.store.SaveABCIResponses(block.Height, abciResponses); err != nil {
return state, 0, err
return state, err
}
fail.Fail() // XXX
@@ -217,12 +222,12 @@ func (blockExec *BlockExecutor) ApplyBlock(
abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
err = validateValidatorUpdates(abciValUpdates, state.ConsensusParams.Validator)
if err != nil {
return state, 0, fmt.Errorf("error in validator updates: %v", err)
return state, fmt.Errorf("error in validator updates: %v", err)
}
validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
if err != nil {
return state, 0, err
return state, err
}
if len(validatorUpdates) > 0 {
blockExec.logger.Debug("updates to validators", "updates", types.ValidatorListString(validatorUpdates))
@@ -235,13 +240,13 @@ func (blockExec *BlockExecutor) ApplyBlock(
// Update the state with the block and responses.
state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates)
if err != nil {
return state, 0, fmt.Errorf("commit failed for application: %v", err)
return state, fmt.Errorf("commit failed for application: %v", err)
}
// Lock mempool, commit app state, update mempoool.
appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs)
if err != nil {
return state, 0, fmt.Errorf("commit failed for application: %v", err)
return state, fmt.Errorf("commit failed for application: %v", err)
}
// Update evpool with the latest state.
@@ -252,16 +257,26 @@ func (blockExec *BlockExecutor) ApplyBlock(
// Update the app hash and save the state.
state.AppHash = appHash
if err := blockExec.store.Save(state); err != nil {
return state, 0, err
return state, err
}
fail.Fail() // XXX
// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := blockExec.pruneBlocks(retainHeight)
if err != nil {
blockExec.logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
} else {
blockExec.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
}
// Events are fired after everything else.
// NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses, validatorUpdates)
return state, retainHeight, nil
return state, nil
}
// Commit locks the mempool, runs the ABCI Commit message, and updates the
@@ -626,3 +641,20 @@ func ExecCommitBlock(
// ResponseCommit has no error or log, just data
return res.Data, nil
}
func (blockExec *BlockExecutor) pruneBlocks(retainHeight int64) (uint64, error) {
base := blockExec.blockStore.Base()
if retainHeight <= base {
return 0, nil
}
pruned, err := blockExec.blockStore.PruneBlocks(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune block store: %w", err)
}
err = blockExec.Store().PruneStates(base, retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune state store: %w", err)
}
return pruned, nil
}