abci: implement finalize block (#9468)

Adds the `FinalizeBlock` method which replaces `BeginBlock`, `DeliverTx`, and `EndBlock` in a single call.
This commit is contained in:
Callum Waters
2022-11-28 23:12:28 +01:00
committed by GitHub
parent 001cd50fc7
commit c5c2aafad2
142 changed files with 6717 additions and 8420 deletions

View File

@@ -1,7 +1,7 @@
package state
import (
"errors"
"context"
"fmt"
"time"
@@ -10,7 +10,6 @@ import (
"github.com/tendermint/tendermint/libs/fail"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/mempool"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@@ -117,8 +116,8 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
block := state.MakeBlock(height, txs, commit, evidence, proposerAddr)
localLastCommit := buildLastCommitInfo(block, blockExec.store, state.InitialHeight)
rpp, err := blockExec.proxyApp.PrepareProposalSync(
abci.RequestPrepareProposal{
rpp, err := blockExec.proxyApp.PrepareProposal(context.TODO(),
&abci.RequestPrepareProposal{
MaxTxBytes: maxDataBytes,
Txs: block.Txs.ToSliceOfBytes(),
LocalLastCommit: extendedCommitInfo(localLastCommit, votes),
@@ -153,7 +152,7 @@ func (blockExec *BlockExecutor) ProcessProposal(
block *types.Block,
state State,
) (bool, error) {
resp, err := blockExec.proxyApp.ProcessProposalSync(abci.RequestProcessProposal{
resp, err := blockExec.proxyApp.ProcessProposal(context.TODO(), &abci.RequestProcessProposal{
Hash: block.Header.Hash(),
Height: block.Header.Height,
Time: block.Header.Time,
@@ -199,33 +198,48 @@ func (blockExec *BlockExecutor) ApplyBlock(
return state, ErrInvalidBlock(err)
}
commitInfo := buildLastCommitInfo(block, blockExec.store, state.InitialHeight)
startTime := time.Now().UnixNano()
abciResponses, err := execBlockOnProxyApp(
blockExec.logger, blockExec.proxyApp, block, blockExec.store, state.InitialHeight,
)
abciResponse, err := blockExec.proxyApp.FinalizeBlock(context.TODO(), &abci.RequestFinalizeBlock{
Hash: block.Hash(),
NextValidatorsHash: block.NextValidatorsHash,
ProposerAddress: block.ProposerAddress,
Height: block.Height,
DecidedLastCommit: commitInfo,
Misbehavior: block.Evidence.Evidence.ToABCI(),
Txs: block.Txs.ToSliceOfBytes(),
})
endTime := time.Now().UnixNano()
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
if err != nil {
return state, ErrProxyAppConn(err)
blockExec.logger.Error("error in proxyAppConn.FinalizeBlock", "err", err)
return state, err
}
// Assert that the application correctly returned tx results for each of the transactions provided in the block
if len(block.Data.Txs) != len(abciResponse.TxResults) {
return state, fmt.Errorf("expected tx results length to match size of transactions in block. Expected %d, got %d", len(block.Data.Txs), len(abciResponse.TxResults))
}
blockExec.logger.Info("executed block", "height", block.Height, "agreed_app_data", abciResponse.AgreedAppData)
fail.Fail() // XXX
// Save the results before we commit.
if err := blockExec.store.SaveABCIResponses(block.Height, abciResponses); err != nil {
if err := blockExec.store.SaveFinalizeBlockResponse(block.Height, abciResponse); err != nil {
return state, err
}
fail.Fail() // XXX
// validate the validator updates and convert to tendermint types
abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
err = validateValidatorUpdates(abciValUpdates, state.ConsensusParams.Validator)
err = validateValidatorUpdates(abciResponse.ValidatorUpdates, state.ConsensusParams.Validator)
if err != nil {
return state, fmt.Errorf("error in validator updates: %v", err)
}
validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciResponse.ValidatorUpdates)
if err != nil {
return state, err
}
@@ -233,18 +247,18 @@ func (blockExec *BlockExecutor) ApplyBlock(
blockExec.logger.Debug("updates to validators", "updates", types.ValidatorListString(validatorUpdates))
blockExec.metrics.ValidatorSetUpdates.Add(1)
}
if abciResponses.EndBlock.ConsensusParamUpdates != nil {
if abciResponse.ConsensusParamUpdates != nil {
blockExec.metrics.ConsensusParamUpdates.Add(1)
}
// Update the state with the block and responses.
state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates)
state, err = updateState(state, blockID, &block.Header, abciResponse, validatorUpdates)
if err != nil {
return state, fmt.Errorf("commit failed for application: %v", err)
}
// Lock mempool, commit app state, update mempoool.
appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs)
retainHeight, err := blockExec.Commit(state, block, abciResponse)
if err != nil {
return state, fmt.Errorf("commit failed for application: %v", err)
}
@@ -255,7 +269,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
fail.Fail() // XXX
// Update the app hash and save the state.
state.AppHash = appHash
state.AppHash = abciResponse.AgreedAppData
if err := blockExec.store.Save(state); err != nil {
return state, err
}
@@ -274,7 +288,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
// Events are fired after everything else.
// NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses, validatorUpdates)
fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponse, validatorUpdates)
return state, nil
}
@@ -288,8 +302,8 @@ func (blockExec *BlockExecutor) ApplyBlock(
func (blockExec *BlockExecutor) Commit(
state State,
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
) ([]byte, int64, error) {
abciResponse *abci.ResponseFinalizeBlock,
) (int64, error) {
blockExec.mempool.Lock()
defer blockExec.mempool.Unlock()
@@ -298,114 +312,37 @@ func (blockExec *BlockExecutor) Commit(
err := blockExec.mempool.FlushAppConn()
if err != nil {
blockExec.logger.Error("client error during mempool.FlushAppConn", "err", err)
return nil, 0, err
return 0, err
}
// Commit block, get hash back
res, err := blockExec.proxyApp.CommitSync()
res, err := blockExec.proxyApp.Commit(context.TODO())
if err != nil {
blockExec.logger.Error("client error during proxyAppConn.CommitSync", "err", err)
return nil, 0, err
return 0, err
}
// ResponseCommit has no error code - just data
blockExec.logger.Info(
"committed state",
"height", block.Height,
"num_txs", len(block.Txs),
"app_hash", fmt.Sprintf("%X", res.Data),
)
// Update mempool.
err = blockExec.mempool.Update(
block.Height,
block.Txs,
deliverTxResponses,
abciResponse.TxResults,
TxPreCheck(state),
TxPostCheck(state),
)
return res.Data, res.RetainHeight, err
return res.RetainHeight, err
}
//---------------------------------------------------------
// Helper functions for executing blocks and updating state
// Executes block's transactions on proxyAppConn.
// Returns a list of transaction results and updates to the validator set
func execBlockOnProxyApp(
logger log.Logger,
proxyAppConn proxy.AppConnConsensus,
block *types.Block,
store Store,
initialHeight int64,
) (*tmstate.ABCIResponses, error) {
var validTxs, invalidTxs = 0, 0
txIndex := 0
abciResponses := new(tmstate.ABCIResponses)
dtxs := make([]*abci.ResponseDeliverTx, len(block.Txs))
abciResponses.DeliverTxs = dtxs
// Execute transactions and get hash.
proxyCb := func(req *abci.Request, res *abci.Response) {
if r, ok := res.Value.(*abci.Response_DeliverTx); ok {
// TODO: make use of res.Log
// TODO: make use of this info
// Blocks may include invalid txs.
txRes := r.DeliverTx
if txRes.Code == abci.CodeTypeOK {
validTxs++
} else {
logger.Debug("invalid tx", "code", txRes.Code, "log", txRes.Log)
invalidTxs++
}
abciResponses.DeliverTxs[txIndex] = txRes
txIndex++
}
}
proxyAppConn.SetResponseCallback(proxyCb)
commitInfo := buildLastCommitInfo(block, store, initialHeight)
// Begin block
var err error
pbh := block.Header.ToProto()
if pbh == nil {
return nil, errors.New("nil header")
}
abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
Hash: block.Hash(),
Header: *pbh,
LastCommitInfo: commitInfo,
ByzantineValidators: block.Evidence.Evidence.ToABCI(),
})
if err != nil {
logger.Error("error in proxyAppConn.BeginBlock", "err", err)
return nil, err
}
// run txs of block
for _, tx := range block.Txs {
proxyAppConn.DeliverTxAsync(abci.RequestDeliverTx{Tx: tx})
if err := proxyAppConn.Error(); err != nil {
return nil, err
}
}
// End block.
abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
if err != nil {
logger.Error("error in proxyAppConn.EndBlock", "err", err)
return nil, err
}
logger.Info("executed block", "height", block.Height, "num_valid_txs", validTxs, "num_invalid_txs", invalidTxs)
return abciResponses, nil
}
func buildLastCommitInfo(block *types.Block, store Store, initialHeight int64) abci.CommitInfo {
if block.Height == initialHeight {
// there is no last commit for the initial height.
@@ -495,7 +432,7 @@ func updateState(
state State,
blockID types.BlockID,
header *types.Header,
abciResponses *tmstate.ABCIResponses,
abciResponse *abci.ResponseFinalizeBlock,
validatorUpdates []*types.Validator,
) (State, error) {
@@ -503,7 +440,7 @@ func updateState(
// and update s.LastValidators and s.Validators.
nValSet := state.NextValidators.Copy()
// Update the validator set with the latest abciResponses.
// Update the validator set with the latest abciResponse.
lastHeightValsChanged := state.LastHeightValidatorsChanged
if len(validatorUpdates) > 0 {
err := nValSet.UpdateWithChangeSet(validatorUpdates)
@@ -517,12 +454,12 @@ func updateState(
// Update validator proposer priority and set state variables.
nValSet.IncrementProposerPriority(1)
// Update the params with the latest abciResponses.
// Update the params with the latest abciResponse.
nextParams := state.ConsensusParams
lastHeightParamsChanged := state.LastHeightConsensusParamsChanged
if abciResponses.EndBlock.ConsensusParamUpdates != nil {
if abciResponse.ConsensusParamUpdates != nil {
// NOTE: must not mutate s.ConsensusParams
nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
nextParams = state.ConsensusParams.Update(abciResponse.ConsensusParamUpdates)
err := nextParams.ValidateBasic()
if err != nil {
return state, fmt.Errorf("error updating consensus params: %v", err)
@@ -551,7 +488,7 @@ func updateState(
LastHeightValidatorsChanged: lastHeightValsChanged,
ConsensusParams: nextParams,
LastHeightConsensusParamsChanged: lastHeightParamsChanged,
LastResultsHash: ABCIResponsesResultsHash(abciResponses),
LastResultsHash: TxResultsHash(abciResponse.TxResults),
AppHash: nil,
}, nil
}
@@ -563,26 +500,32 @@ func fireEvents(
logger log.Logger,
eventBus types.BlockEventPublisher,
block *types.Block,
abciResponses *tmstate.ABCIResponses,
blockID types.BlockID,
abciResponse *abci.ResponseFinalizeBlock,
validatorUpdates []*types.Validator,
) {
if err := eventBus.PublishEventNewBlock(types.EventDataNewBlock{
Block: block,
ResultBeginBlock: *abciResponses.BeginBlock,
ResultEndBlock: *abciResponses.EndBlock,
Block: block,
BlockID: blockID,
ResultFinalizeBlock: *abciResponse,
}); err != nil {
logger.Error("failed publishing new block", "err", err)
}
if err := eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: block.Header,
NumTxs: int64(len(block.Txs)),
ResultBeginBlock: *abciResponses.BeginBlock,
ResultEndBlock: *abciResponses.EndBlock,
Header: block.Header,
}); err != nil {
logger.Error("failed publishing new block header", "err", err)
}
if err := eventBus.PublishEventNewBlockEvents(types.EventDataNewBlockEvents{
Height: block.Height,
Events: abciResponse.Events,
NumTxs: int64(len(block.Txs)),
}); err != nil {
logger.Error("failed publishing new block events", "err", err)
}
if len(block.Evidence.Evidence) != 0 {
for _, ev := range block.Evidence.Evidence {
if err := eventBus.PublishEventNewEvidence(types.EventDataNewEvidence{
@@ -599,7 +542,7 @@ func fireEvents(
Height: block.Height,
Index: uint32(i),
Tx: tx,
Result: *(abciResponses.DeliverTxs[i]),
Result: *(abciResponse.TxResults[i]),
}}); err != nil {
logger.Error("failed publishing event TX", "err", err)
}
@@ -625,21 +568,37 @@ func ExecCommitBlock(
store Store,
initialHeight int64,
) ([]byte, error) {
_, err := execBlockOnProxyApp(logger, appConnConsensus, block, store, initialHeight)
commitInfo := buildLastCommitInfo(block, store, initialHeight)
resp, err := appConnConsensus.FinalizeBlock(context.TODO(), &abci.RequestFinalizeBlock{
Hash: block.Hash(),
NextValidatorsHash: block.NextValidatorsHash,
ProposerAddress: block.ProposerAddress,
Height: block.Height,
DecidedLastCommit: commitInfo,
Misbehavior: block.Evidence.Evidence.ToABCI(),
Txs: block.Txs.ToSliceOfBytes(),
})
if err != nil {
logger.Error("failed executing block on proxy app", "height", block.Height, "err", err)
logger.Error("error in proxyAppConn.FinalizeBlock", "err", err)
return nil, err
}
// Assert that the application correctly returned tx results for each of the transactions provided in the block
if len(block.Data.Txs) != len(resp.TxResults) {
return nil, fmt.Errorf("expected tx results length to match size of transactions in block. Expected %d, got %d", len(block.Data.Txs), len(resp.TxResults))
}
logger.Info("executed block", "height", block.Height, "agreed_app_data", resp.AgreedAppData)
// Commit block, get hash back
res, err := appConnConsensus.CommitSync()
_, err = appConnConsensus.Commit(context.TODO())
if err != nil {
logger.Error("client error during proxyAppConn.CommitSync", "err", res)
logger.Error("client error during proxyAppConn.CommitSync", "err", err)
return nil, err
}
// ResponseCommit has no error or log, just data
return res.Data, nil
return resp.AgreedAppData, nil
}
func (blockExec *BlockExecutor) pruneBlocks(retainHeight int64, state State) (uint64, error) {