mirror of
https://github.com/tendermint/tendermint.git
synced 2026-05-20 22:21:28 +00:00
migrate abci to finalizeBlock
This commit is contained in:
@@ -46,6 +46,7 @@ type Client interface {
|
||||
OfferSnapshotAsync(context.Context, types.RequestOfferSnapshot) (*ReqRes, error)
|
||||
LoadSnapshotChunkAsync(context.Context, types.RequestLoadSnapshotChunk) (*ReqRes, error)
|
||||
ApplySnapshotChunkAsync(context.Context, types.RequestApplySnapshotChunk) (*ReqRes, error)
|
||||
FinalizeBlockAsync(context.Context, types.RequestFinalizeBlock) (*ReqRes, error)
|
||||
|
||||
// Synchronous requests
|
||||
FlushSync(context.Context) error
|
||||
@@ -62,6 +63,7 @@ type Client interface {
|
||||
OfferSnapshotSync(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
|
||||
LoadSnapshotChunkSync(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
|
||||
ApplySnapshotChunkSync(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
|
||||
FinalizeBlockSync(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error)
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
@@ -314,6 +314,22 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(
|
||||
)
|
||||
}
|
||||
|
||||
func (cli *grpcClient) FinalizeBlockAsync(
|
||||
ctx context.Context,
|
||||
params types.RequestFinalizeBlock,
|
||||
) (*ReqRes, error) {
|
||||
req := types.ToRequestFinalizeBlock(params)
|
||||
res, err := cli.client.FinalizeBlock(ctx, req.GetFinalizeBlock(), grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cli.finishAsyncCall(
|
||||
ctx,
|
||||
req,
|
||||
&types.Response{Value: &types.Response_FinalizeBlock{FinalizeBlock: res}},
|
||||
)
|
||||
}
|
||||
|
||||
// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
|
||||
// with the response. We don't complete it until it's been ordered via the channel.
|
||||
func (cli *grpcClient) finishAsyncCall(ctx context.Context, req *types.Request, res *types.Response) (*ReqRes, error) {
|
||||
@@ -504,3 +520,14 @@ func (cli *grpcClient) ApplySnapshotChunkSync(
|
||||
}
|
||||
return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
|
||||
}
|
||||
|
||||
func (cli *grpcClient) FinalizeBlockSync(
|
||||
ctx context.Context,
|
||||
params types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
|
||||
|
||||
reqres, err := cli.FinalizeBlockAsync(ctx, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cli.finishSyncCall(reqres).GetFinalizeBlock(), cli.Error()
|
||||
}
|
||||
|
||||
@@ -204,6 +204,20 @@ func (app *localClient) ApplySnapshotChunkAsync(
|
||||
), nil
|
||||
}
|
||||
|
||||
func (app *localClient) FinalizeBlockAsync(
|
||||
ctx context.Context,
|
||||
req types.RequestFinalizeBlock,
|
||||
) (*ReqRes, error) {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
|
||||
res := app.Application.FinalizeBlock(req)
|
||||
return app.callback(
|
||||
types.ToRequestFinalizeBlock(req),
|
||||
types.ToResponseFinalizeBlock(res),
|
||||
), nil
|
||||
}
|
||||
|
||||
//-------------------------------------------------------
|
||||
|
||||
func (app *localClient) FlushSync(ctx context.Context) error {
|
||||
@@ -346,6 +360,17 @@ func (app *localClient) ApplySnapshotChunkSync(
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *localClient) FinalizeBlockSync(
|
||||
ctx context.Context,
|
||||
req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
|
||||
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
|
||||
res := app.Application.FinalizeBlock(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
//-------------------------------------------------------
|
||||
|
||||
func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
|
||||
|
||||
@@ -295,6 +295,13 @@ func (cli *socketClient) ApplySnapshotChunkAsync(
|
||||
return cli.queueRequestAsync(ctx, types.ToRequestApplySnapshotChunk(req))
|
||||
}
|
||||
|
||||
func (cli *socketClient) FinalizeBlockAsync(
|
||||
ctx context.Context,
|
||||
req types.RequestFinalizeBlock,
|
||||
) (*ReqRes, error) {
|
||||
return cli.queueRequestAsync(ctx, types.ToRequestFinalizeBlock(req))
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
func (cli *socketClient) FlushSync(ctx context.Context) error {
|
||||
@@ -465,6 +472,17 @@ func (cli *socketClient) ApplySnapshotChunkSync(
|
||||
return reqres.Response.GetApplySnapshotChunk(), nil
|
||||
}
|
||||
|
||||
func (cli *socketClient) FinalizeBlockSync(
|
||||
ctx context.Context,
|
||||
req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
|
||||
|
||||
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestFinalizeBlock(req))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return reqres.Response.GetFinalizeBlock(), nil
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
// queueRequest enqueues req onto the queue. If the queue is full, it ether
|
||||
@@ -591,6 +609,8 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
|
||||
_, ok = res.Value.(*types.Response_ListSnapshots)
|
||||
case *types.Request_OfferSnapshot:
|
||||
_, ok = res.Value.(*types.Response_OfferSnapshot)
|
||||
case *types.Request_FinalizeBlock:
|
||||
_, ok = res.Value.(*types.Response_FinalizeBlock)
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
@@ -117,6 +117,43 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
|
||||
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
|
||||
}
|
||||
|
||||
// tx is either "key=value" or just arbitrary bytes
|
||||
func (app *Application) FinalizeBlock(req types.RequestFinalizeBlock) types.ResponseFinalizeBlock {
|
||||
var key, value string
|
||||
var txs = make([]*types.ResponseDeliverTx, len(req.Txs))
|
||||
|
||||
for i, tx := range req.Txs {
|
||||
parts := bytes.Split(tx, []byte("="))
|
||||
if len(parts) == 2 {
|
||||
key, value = string(parts[0]), string(parts[1])
|
||||
} else {
|
||||
key, value = string(tx), string(tx)
|
||||
}
|
||||
|
||||
err := app.state.db.Set(prefixKey([]byte(key)), []byte(value))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
app.state.Size++
|
||||
|
||||
events := []types.Event{
|
||||
{
|
||||
Type: "app",
|
||||
Attributes: []types.EventAttribute{
|
||||
{Key: "creator", Value: "Cosmoshi Netowoko", Index: true},
|
||||
{Key: "key", Value: key, Index: true},
|
||||
{Key: "index_key", Value: "index is working", Index: true},
|
||||
{Key: "noindex_key", Value: "index is working", Index: false},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
txs[i] = &types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
|
||||
}
|
||||
|
||||
return types.ResponseFinalizeBlock{Txs: txs}
|
||||
}
|
||||
|
||||
func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
|
||||
return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
|
||||
}
|
||||
|
||||
@@ -170,6 +170,44 @@ func (app *PersistentKVStoreApplication) ApplySnapshotChunk(
|
||||
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
|
||||
}
|
||||
|
||||
func (app *PersistentKVStoreApplication) FinalizeBlock(
|
||||
req types.RequestFinalizeBlock) types.ResponseFinalizeBlock {
|
||||
// for i, tx := range req.Txs {
|
||||
// // if it starts with "val:", update the validator set
|
||||
// // format is "val:pubkey!power"
|
||||
// if isValidatorTx(tx) {
|
||||
// // update validators in the merkle tree
|
||||
// // and in app.ValUpdates
|
||||
// return app.execValidatorTx(req.Tx)
|
||||
// }
|
||||
|
||||
// // otherwise, update the key-value store
|
||||
// return app.app.DeliverTx(tx)
|
||||
// }
|
||||
// reset valset changes
|
||||
app.ValUpdates = make([]types.ValidatorUpdate, 0)
|
||||
|
||||
// Punish validators who committed equivocation.
|
||||
for _, ev := range req.ByzantineValidators {
|
||||
if ev.Type == types.EvidenceType_DUPLICATE_VOTE {
|
||||
addr := string(ev.Validator.Address)
|
||||
if pubKey, ok := app.valAddrToPubKeyMap[addr]; ok {
|
||||
app.updateValidator(types.ValidatorUpdate{
|
||||
PubKey: pubKey,
|
||||
Power: ev.Validator.Power - 1,
|
||||
})
|
||||
app.logger.Info("Decreased val power by 1 because of the equivocation",
|
||||
"val", addr)
|
||||
} else {
|
||||
app.logger.Error("Wanted to punish val, but can't find it",
|
||||
"val", addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return types.ResponseFinalizeBlock{ValidatorUpdates: app.ValUpdates}
|
||||
}
|
||||
|
||||
//---------------------------------------------
|
||||
// update validators
|
||||
|
||||
|
||||
@@ -233,6 +233,9 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types
|
||||
case *types.Request_ApplySnapshotChunk:
|
||||
res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk)
|
||||
responses <- types.ToResponseApplySnapshotChunk(res)
|
||||
case *types.Request_FinalizeBlock:
|
||||
res := s.app.FinalizeBlock(*r.FinalizeBlock)
|
||||
responses <- types.ToResponseFinalizeBlock(res)
|
||||
default:
|
||||
responses <- types.ToResponseException("Unknown request")
|
||||
}
|
||||
|
||||
@@ -21,7 +21,8 @@ type Application interface {
|
||||
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
|
||||
DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing
|
||||
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
|
||||
Commit() ResponseCommit // Commit the state and return the application Merkle root hash
|
||||
FinalizeBlock(RequestFinalizeBlock) ResponseFinalizeBlock
|
||||
Commit() ResponseCommit // Commit the state and return the application Merkle root hash
|
||||
|
||||
// State Sync Connection
|
||||
ListSnapshots(RequestListSnapshots) ResponseListSnapshots // List available snapshots
|
||||
@@ -90,6 +91,10 @@ func (BaseApplication) ApplySnapshotChunk(req RequestApplySnapshotChunk) Respons
|
||||
return ResponseApplySnapshotChunk{}
|
||||
}
|
||||
|
||||
func (BaseApplication) FinalizeBlock(req RequestFinalizeBlock) ResponseFinalizeBlock {
|
||||
return ResponseFinalizeBlock{}
|
||||
}
|
||||
|
||||
//-------------------------------------------------------
|
||||
|
||||
// GRPCApplication is a GRPC wrapper for Application
|
||||
@@ -172,3 +177,9 @@ func (app *GRPCApplication) ApplySnapshotChunk(
|
||||
res := app.app.ApplySnapshotChunk(*req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *GRPCApplication) FinalizeBlock(
|
||||
ctx context.Context, req *RequestFinalizeBlock) (*ResponseFinalizeBlock, error) {
|
||||
res := app.app.FinalizeBlock(*req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
@@ -114,6 +114,12 @@ func ToRequestApplySnapshotChunk(req RequestApplySnapshotChunk) *Request {
|
||||
}
|
||||
}
|
||||
|
||||
func ToRequestFinalizeBlock(req RequestFinalizeBlock) *Request {
|
||||
return &Request{
|
||||
Value: &Request_FinalizeBlock{&req},
|
||||
}
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
func ToResponseException(errStr string) *Response {
|
||||
@@ -204,3 +210,9 @@ func ToResponseApplySnapshotChunk(res ResponseApplySnapshotChunk) *Response {
|
||||
Value: &Response_ApplySnapshotChunk{&res},
|
||||
}
|
||||
}
|
||||
|
||||
func ToResponseFinalizeBlock(res ResponseFinalizeBlock) *Response {
|
||||
return &Response{
|
||||
Value: &Response_FinalizeBlock{&res},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ type AppConnConsensus interface {
|
||||
BeginBlockSync(context.Context, types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
|
||||
DeliverTxAsync(context.Context, types.RequestDeliverTx) (*abcicli.ReqRes, error)
|
||||
EndBlockSync(context.Context, types.RequestEndBlock) (*types.ResponseEndBlock, error)
|
||||
FinalizeBlockSync(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error)
|
||||
CommitSync(context.Context) (*types.ResponseCommit, error)
|
||||
}
|
||||
|
||||
@@ -102,6 +103,10 @@ func (app *appConnConsensus) CommitSync(ctx context.Context) (*types.ResponseCom
|
||||
return app.appConn.CommitSync(ctx)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) FinalizeBlockSync(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
|
||||
return app.appConn.FinalizeBlockSync(ctx, req)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnMempool (subset of abcicli.Client)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user