abci: PrepareProposal (#6544)

This commit is contained in:
Marko
2021-07-27 07:30:34 +00:00
committed by GitHub
parent c3ae6f5b58
commit 7d30987cff
27 changed files with 1091 additions and 224 deletions

View File

@@ -46,6 +46,7 @@ type Client interface {
OfferSnapshotAsync(context.Context, types.RequestOfferSnapshot) (*ReqRes, error)
LoadSnapshotChunkAsync(context.Context, types.RequestLoadSnapshotChunk) (*ReqRes, error)
ApplySnapshotChunkAsync(context.Context, types.RequestApplySnapshotChunk) (*ReqRes, error)
PrepareProposalAsync(context.Context, types.RequestPrepareProposal) (*ReqRes, error)
// Synchronous requests
FlushSync(context.Context) error
@@ -62,6 +63,7 @@ type Client interface {
OfferSnapshotSync(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunkSync(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunkSync(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
PrepareProposalSync(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
}
//----------------------------------------

View File

@@ -314,6 +314,27 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(
)
}
func (cli *grpcClient) PrepareProposalAsync(
ctx context.Context,
params types.RequestPrepareProposal,
) (*ReqRes, error) {
req := types.ToRequestPrepareProposal(params)
res, err := cli.client.PrepareProposal(ctx, req.GetPrepareProposal(), grpc.WaitForReady(true))
if err != nil {
return nil, err
}
return cli.finishAsyncCall(
ctx,
req,
&types.Response{
Value: &types.Response_PrepareProposal{
PrepareProposal: res,
},
},
)
}
// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(ctx context.Context, req *types.Request, res *types.Response) (*ReqRes, error) {
@@ -504,3 +525,15 @@ func (cli *grpcClient) ApplySnapshotChunkSync(
}
return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
}
func (cli *grpcClient) PrepareProposalSync(
ctx context.Context,
params types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {
reqres, err := cli.PrepareProposalAsync(ctx, params)
if err != nil {
return nil, err
}
return cli.finishSyncCall(reqres).GetPrepareProposal(), cli.Error()
}

View File

@@ -204,6 +204,20 @@ func (app *localClient) ApplySnapshotChunkAsync(
), nil
}
func (app *localClient) PrepareProposalAsync(
ctx context.Context,
req types.RequestPrepareProposal,
) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.PrepareProposal(req)
return app.callback(
types.ToRequestPrepareProposal(req),
types.ToResponsePrepareProposal(res),
), nil
}
//-------------------------------------------------------
func (app *localClient) FlushSync(ctx context.Context) error {
@@ -346,6 +360,18 @@ func (app *localClient) ApplySnapshotChunkSync(
return &res, nil
}
func (app *localClient) PrepareProposalSync(
ctx context.Context,
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.PrepareProposal(req)
return &res, nil
}
//-------------------------------------------------------
func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {

View File

@@ -669,6 +669,52 @@ func (_m *Client) OnStop() {
_m.Called()
}
// PrepareProposalAsync provides a mock function with given fields: _a0, _a1
func (_m *Client) PrepareProposalAsync(_a0 context.Context, _a1 types.RequestPrepareProposal) (*abcicli.ReqRes, error) {
ret := _m.Called(_a0, _a1)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(context.Context, types.RequestPrepareProposal) *abcicli.ReqRes); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, types.RequestPrepareProposal) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PrepareProposalSync provides a mock function with given fields: _a0, _a1
func (_m *Client) PrepareProposalSync(_a0 context.Context, _a1 types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
ret := _m.Called(_a0, _a1)
var r0 *types.ResponsePrepareProposal
if rf, ok := ret.Get(0).(func(context.Context, types.RequestPrepareProposal) *types.ResponsePrepareProposal); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponsePrepareProposal)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, types.RequestPrepareProposal) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// QueryAsync provides a mock function with given fields: _a0, _a1
func (_m *Client) QueryAsync(_a0 context.Context, _a1 types.RequestQuery) (*abcicli.ReqRes, error) {
ret := _m.Called(_a0, _a1)

View File

@@ -295,6 +295,13 @@ func (cli *socketClient) ApplySnapshotChunkAsync(
return cli.queueRequestAsync(ctx, types.ToRequestApplySnapshotChunk(req))
}
func (cli *socketClient) PrepareProposalAsync(
ctx context.Context,
req types.RequestPrepareProposal,
) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestPrepareProposal(req))
}
//----------------------------------------
func (cli *socketClient) FlushSync(ctx context.Context) error {
@@ -465,6 +472,18 @@ func (cli *socketClient) ApplySnapshotChunkSync(
return reqres.Response.GetApplySnapshotChunk(), nil
}
func (cli *socketClient) PrepareProposalSync(
ctx context.Context,
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestPrepareProposal(req))
if err != nil {
return nil, err
}
return reqres.Response.GetPrepareProposal(), nil
}
//----------------------------------------
// queueRequest enqueues req onto the queue. If the queue is full, it ether
@@ -591,6 +610,8 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
_, ok = res.Value.(*types.Response_ListSnapshots)
case *types.Request_OfferSnapshot:
_, ok = res.Value.(*types.Response_OfferSnapshot)
case *types.Request_PrepareProposal:
_, ok = res.Value.(*types.Response_PrepareProposal)
}
return ok
}

View File

@@ -171,3 +171,9 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo
return resQuery
}
func (app *Application) PrepareProposal(
req types.RequestPrepareProposal) types.ResponsePrepareProposal {
return types.ResponsePrepareProposal{
BlockData: req.BlockData}
}

View File

@@ -170,6 +170,15 @@ func (app *PersistentKVStoreApplication) ApplySnapshotChunk(
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}
func (app *PersistentKVStoreApplication) PrepareProposal(
req types.RequestPrepareProposal) types.ResponsePrepareProposal {
if len(req.BlockData) >= 1 {
req.BlockData[1] = []byte("modified tx")
}
return types.ResponsePrepareProposal{BlockData: req.BlockData}
}
//---------------------------------------------
// update validators

View File

@@ -227,6 +227,9 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types
case *types.Request_OfferSnapshot:
res := s.app.OfferSnapshot(*r.OfferSnapshot)
responses <- types.ToResponseOfferSnapshot(res)
case *types.Request_PrepareProposal:
res := s.app.PrepareProposal(*r.PrepareProposal)
responses <- types.ToResponsePrepareProposal(res)
case *types.Request_LoadSnapshotChunk:
res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk)
responses <- types.ToResponseLoadSnapshotChunk(res)

View File

@@ -17,7 +17,8 @@ type Application interface {
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
// Consensus Connection
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
PrepareProposal(RequestPrepareProposal) ResponsePrepareProposal
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
@@ -90,6 +91,10 @@ func (BaseApplication) ApplySnapshotChunk(req RequestApplySnapshotChunk) Respons
return ResponseApplySnapshotChunk{}
}
func (BaseApplication) PrepareProposal(req RequestPrepareProposal) ResponsePrepareProposal {
return ResponsePrepareProposal{}
}
//-------------------------------------------------------
// GRPCApplication is a GRPC wrapper for Application
@@ -172,3 +177,9 @@ func (app *GRPCApplication) ApplySnapshotChunk(
res := app.app.ApplySnapshotChunk(*req)
return &res, nil
}
func (app *GRPCApplication) PrepareProposal(
ctx context.Context, req *RequestPrepareProposal) (*ResponsePrepareProposal, error) {
res := app.app.PrepareProposal(*req)
return &res, nil
}

View File

@@ -114,6 +114,12 @@ func ToRequestApplySnapshotChunk(req RequestApplySnapshotChunk) *Request {
}
}
func ToRequestPrepareProposal(req RequestPrepareProposal) *Request {
return &Request{
Value: &Request_PrepareProposal{&req},
}
}
//----------------------------------------
func ToResponseException(errStr string) *Response {
@@ -204,3 +210,9 @@ func ToResponseApplySnapshotChunk(res ResponseApplySnapshotChunk) *Response {
Value: &Response_ApplySnapshotChunk{&res},
}
}
func ToResponsePrepareProposal(res ResponsePrepareProposal) *Response {
return &Response{
Value: &Response_PrepareProposal{&res},
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -270,3 +270,8 @@ func (app *CounterApplication) Commit() abci.ResponseCommit {
binary.BigEndian.PutUint64(hash, uint64(app.txCount))
return abci.ResponseCommit{Data: hash}
}
func (app *CounterApplication) PrepareProposal(
req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
return abci.ResponsePrepareProposal{BlockData: req.BlockData} //nolint:gosimple
}

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks

View File

@@ -1,4 +1,4 @@
// nolint
//nolint
package query
import (

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks

View File

@@ -35,6 +35,7 @@ message Request {
RequestOfferSnapshot offer_snapshot = 12;
RequestLoadSnapshotChunk load_snapshot_chunk = 13;
RequestApplySnapshotChunk apply_snapshot_chunk = 14;
RequestPrepareProposal prepare_proposal = 15;
}
}
@@ -117,6 +118,15 @@ message RequestApplySnapshotChunk {
string sender = 3;
}
message RequestPrepareProposal {
// block_data is an array of transactions that will be included in a block,
// sent to the app for possible modifications.
// applications can not exceed the size of the data passed to it.
repeated bytes block_data = 1;
// If an application decides to populate block_data with extra information, they can not exceed this value.
int64 block_data_size = 2;
}
//----------------------------------------
// Response types
@@ -137,6 +147,7 @@ message Response {
ResponseOfferSnapshot offer_snapshot = 13;
ResponseLoadSnapshotChunk load_snapshot_chunk = 14;
ResponseApplySnapshotChunk apply_snapshot_chunk = 15;
ResponsePrepareProposal prepare_proposal = 16;
}
}
@@ -259,6 +270,10 @@ message ResponseApplySnapshotChunk {
}
}
message ResponsePrepareProposal {
repeated bytes block_data = 1;
}
//----------------------------------------
// Misc.
@@ -363,4 +378,5 @@ service ABCIApplication {
rpc OfferSnapshot(RequestOfferSnapshot) returns (ResponseOfferSnapshot);
rpc LoadSnapshotChunk(RequestLoadSnapshotChunk) returns (ResponseLoadSnapshotChunk);
rpc ApplySnapshotChunk(RequestApplySnapshotChunk) returns (ResponseApplySnapshotChunk);
rpc PrepareProposal(RequestPrepareProposal) returns (ResponsePrepareProposal);
}

View File

@@ -17,6 +17,7 @@ type AppConnConsensus interface {
Error() error
InitChainSync(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
PrepareProposalSync(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
BeginBlockSync(context.Context, types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
DeliverTxAsync(context.Context, types.RequestDeliverTx) (*abcicli.ReqRes, error)
@@ -80,6 +81,13 @@ func (app *appConnConsensus) InitChainSync(
return app.appConn.InitChainSync(ctx, req)
}
func (app *appConnConsensus) PrepareProposalSync(
ctx context.Context,
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {
return app.appConn.PrepareProposalSync(ctx, req)
}
func (app *appConnConsensus) BeginBlockSync(
ctx context.Context,
req types.RequestBeginBlock,

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks
@@ -146,6 +146,29 @@ func (_m *AppConnConsensus) InitChainSync(_a0 context.Context, _a1 types.Request
return r0, r1
}
// PrepareProposalSync provides a mock function with given fields: _a0, _a1
func (_m *AppConnConsensus) PrepareProposalSync(_a0 context.Context, _a1 types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
ret := _m.Called(_a0, _a1)
var r0 *types.ResponsePrepareProposal
if rf, ok := ret.Get(0).(func(context.Context, types.RequestPrepareProposal) *types.ResponsePrepareProposal); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponsePrepareProposal)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, types.RequestPrepareProposal) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SetResponseCallback provides a mock function with given fields: _a0
func (_m *AppConnConsensus) SetResponseCallback(_a0 abcicli.Callback) {
_m.Called(_a0)

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks

View File

@@ -99,6 +99,8 @@ func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher)
// and txs from the mempool. The max bytes must be big enough to fit the commit.
// Up to 1/10th of the block space is allcoated for maximum sized evidence.
// The rest is given to txs, up to the max gas.
//
// Contract: application will not return more bytes than are sent over the wire.
func (blockExec *BlockExecutor) CreateProposalBlock(
height int64,
state State, commit *types.Commit,
@@ -115,7 +117,34 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
return state.MakeBlock(height, txs, commit, evidence, proposerAddr)
preparedProposal, err := blockExec.proxyApp.PrepareProposalSync(
context.Background(),
abci.RequestPrepareProposal{BlockData: txs.ToSliceOfBytes(), BlockDataSize: maxDataBytes},
)
if err != nil {
// The App MUST ensure that only valid (and hence 'processable') transactions
// enter the mempool. Hence, at this point, we can't have any non-processable
// transaction causing an error.
//
// Also, the App can simply skip any transaction that could cause any kind of trouble.
// Either way, we can not recover in a meaningful way, unless we skip proposing
// this block, repair what caused the error and try again. Hence, we panic on
// purpose for now.
panic(err)
}
newTxs := preparedProposal.GetBlockData()
var txSize int
for _, tx := range newTxs {
txSize += len(tx)
if maxDataBytes < int64(txSize) {
panic("block data exceeds max amount of allowed bytes")
}
}
modifiedTxs := types.ToTxs(preparedProposal.GetBlockData())
return state.MakeBlock(height, modifiedTxs, commit, evidence, proposerAddr)
}
// ValidateBlock validates the given block against the given state.

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks

View File

@@ -204,6 +204,11 @@ func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) a
return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}
}
func (app *Application) PrepareProposal(
req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
return abci.ResponsePrepareProposal{BlockData: req.BlockData} //nolint:gosimple
}
// validatorUpdates generates a validator set update.
func (app *Application) validatorUpdates(height uint64) (abci.ValidatorUpdates, error) {
updates := app.cfg.ValidatorUpdates[fmt.Sprintf("%v", height)]

View File

@@ -79,6 +79,27 @@ func (txs Txs) Proof(i int) TxProof {
}
}
// ToSliceOfBytes converts a Txs to slice of byte slices.
//
// NOTE: This method should become obsolete once Txs is switched to [][]byte.
// ref: #2603
func (txs Txs) ToSliceOfBytes() [][]byte {
txBzs := make([][]byte, len(txs))
for i := 0; i < len(txs); i++ {
txBzs[i] = txs[i]
}
return txBzs
}
// ToTxs converts a raw slice of byte slices into a Txs type.
func ToTxs(txs [][]byte) Txs {
txBzs := make(Txs, len(txs))
for i := 0; i < len(txs); i++ {
txBzs[i] = txs[i]
}
return txBzs
}
// TxProof represents a Merkle proof of the presence of a transaction in the Merkle tree.
type TxProof struct {
RootHash tmbytes.HexBytes `json:"root_hash"`