diff --git a/abci/client/client.go b/abci/client/client.go index 387fc551a..e14956a4b 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -46,6 +46,7 @@ type Client interface { OfferSnapshotAsync(context.Context, types.RequestOfferSnapshot) (*ReqRes, error) LoadSnapshotChunkAsync(context.Context, types.RequestLoadSnapshotChunk) (*ReqRes, error) ApplySnapshotChunkAsync(context.Context, types.RequestApplySnapshotChunk) (*ReqRes, error) + FinalizeBlockAsync(context.Context, types.RequestFinalizeBlock) (*ReqRes, error) // Synchronous requests FlushSync(context.Context) error @@ -62,6 +63,7 @@ type Client interface { OfferSnapshotSync(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) LoadSnapshotChunkSync(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) ApplySnapshotChunkSync(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) + FinalizeBlockSync(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) } //---------------------------------------- diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 31bd6fae1..021528f84 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -314,6 +314,22 @@ func (cli *grpcClient) ApplySnapshotChunkAsync( ) } +func (cli *grpcClient) FinalizeBlockAsync( + ctx context.Context, + params types.RequestFinalizeBlock, +) (*ReqRes, error) { + req := types.ToRequestFinalizeBlock(params) + res, err := cli.client.FinalizeBlock(ctx, req.GetFinalizeBlock(), grpc.WaitForReady(true)) + if err != nil { + return nil, err + } + return cli.finishAsyncCall( + ctx, + req, + &types.Response{Value: &types.Response_FinalizeBlock{FinalizeBlock: res}}, + ) +} + // finishAsyncCall creates a ReqRes for an async call, and immediately populates it // with the response. We don't complete it until it's been ordered via the channel. func (cli *grpcClient) finishAsyncCall(ctx context.Context, req *types.Request, res *types.Response) (*ReqRes, error) { @@ -504,3 +520,14 @@ func (cli *grpcClient) ApplySnapshotChunkSync( } return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error() } + +func (cli *grpcClient) FinalizeBlockSync( + ctx context.Context, + params types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { + + reqres, err := cli.FinalizeBlockAsync(ctx, params) + if err != nil { + return nil, err + } + return cli.finishSyncCall(reqres).GetFinalizeBlock(), cli.Error() +} diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 69457b5b0..33733e573 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -204,6 +204,20 @@ func (app *localClient) ApplySnapshotChunkAsync( ), nil } +func (app *localClient) FinalizeBlockAsync( + ctx context.Context, + req types.RequestFinalizeBlock, +) (*ReqRes, error) { + app.mtx.Lock() + defer app.mtx.Unlock() + + res := app.Application.FinalizeBlock(req) + return app.callback( + types.ToRequestFinalizeBlock(req), + types.ToResponseFinalizeBlock(res), + ), nil +} + //------------------------------------------------------- func (app *localClient) FlushSync(ctx context.Context) error { @@ -346,6 +360,17 @@ func (app *localClient) ApplySnapshotChunkSync( return &res, nil } +func (app *localClient) FinalizeBlockSync( + ctx context.Context, + req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { + + app.mtx.Lock() + defer app.mtx.Unlock() + + res := app.Application.FinalizeBlock(req) + return &res, nil +} + //------------------------------------------------------- func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes { diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 3fef8540d..762395fee 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -295,6 +295,13 @@ func (cli *socketClient) ApplySnapshotChunkAsync( return cli.queueRequestAsync(ctx, types.ToRequestApplySnapshotChunk(req)) } +func (cli *socketClient) FinalizeBlockAsync( + ctx context.Context, + req types.RequestFinalizeBlock, +) (*ReqRes, error) { + return cli.queueRequestAsync(ctx, types.ToRequestFinalizeBlock(req)) +} + //---------------------------------------- func (cli *socketClient) FlushSync(ctx context.Context) error { @@ -465,6 +472,17 @@ func (cli *socketClient) ApplySnapshotChunkSync( return reqres.Response.GetApplySnapshotChunk(), nil } +func (cli *socketClient) FinalizeBlockSync( + ctx context.Context, + req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { + + reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestFinalizeBlock(req)) + if err != nil { + return nil, err + } + return reqres.Response.GetFinalizeBlock(), nil +} + //---------------------------------------- // queueRequest enqueues req onto the queue. If the queue is full, it ether @@ -591,6 +609,8 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { _, ok = res.Value.(*types.Response_ListSnapshots) case *types.Request_OfferSnapshot: _, ok = res.Value.(*types.Response_OfferSnapshot) + case *types.Request_FinalizeBlock: + _, ok = res.Value.(*types.Response_FinalizeBlock) } return ok } diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index 97256c8ac..3cf9c5413 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -117,6 +117,43 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events} } +// tx is either "key=value" or just arbitrary bytes +func (app *Application) FinalizeBlock(req types.RequestFinalizeBlock) types.ResponseFinalizeBlock { + var key, value string + var txs = make([]*types.ResponseDeliverTx, len(req.Txs)) + + for i, tx := range req.Txs { + parts := bytes.Split(tx, []byte("=")) + if len(parts) == 2 { + key, value = string(parts[0]), string(parts[1]) + } else { + key, value = string(tx), string(tx) + } + + err := app.state.db.Set(prefixKey([]byte(key)), []byte(value)) + if err != nil { + panic(err) + } + app.state.Size++ + + events := []types.Event{ + { + Type: "app", + Attributes: []types.EventAttribute{ + {Key: "creator", Value: "Cosmoshi Netowoko", Index: true}, + {Key: "key", Value: key, Index: true}, + {Key: "index_key", Value: "index is working", Index: true}, + {Key: "noindex_key", Value: "index is working", Index: false}, + }, + }, + } + + txs[i] = &types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events} + } + + return types.ResponseFinalizeBlock{Txs: txs} +} + func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1} } diff --git a/abci/example/kvstore/persistent_kvstore.go b/abci/example/kvstore/persistent_kvstore.go index 0fcfcadf7..a935255d7 100644 --- a/abci/example/kvstore/persistent_kvstore.go +++ b/abci/example/kvstore/persistent_kvstore.go @@ -170,6 +170,44 @@ func (app *PersistentKVStoreApplication) ApplySnapshotChunk( return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} } +func (app *PersistentKVStoreApplication) FinalizeBlock( + req types.RequestFinalizeBlock) types.ResponseFinalizeBlock { + // for i, tx := range req.Txs { + // // if it starts with "val:", update the validator set + // // format is "val:pubkey!power" + // if isValidatorTx(tx) { + // // update validators in the merkle tree + // // and in app.ValUpdates + // return app.execValidatorTx(req.Tx) + // } + + // // otherwise, update the key-value store + // return app.app.DeliverTx(tx) + // } + // reset valset changes + app.ValUpdates = make([]types.ValidatorUpdate, 0) + + // Punish validators who committed equivocation. + for _, ev := range req.ByzantineValidators { + if ev.Type == types.EvidenceType_DUPLICATE_VOTE { + addr := string(ev.Validator.Address) + if pubKey, ok := app.valAddrToPubKeyMap[addr]; ok { + app.updateValidator(types.ValidatorUpdate{ + PubKey: pubKey, + Power: ev.Validator.Power - 1, + }) + app.logger.Info("Decreased val power by 1 because of the equivocation", + "val", addr) + } else { + app.logger.Error("Wanted to punish val, but can't find it", + "val", addr) + } + } + } + + return types.ResponseFinalizeBlock{ValidatorUpdates: app.ValUpdates} +} + //--------------------------------------------- // update validators diff --git a/abci/server/socket_server.go b/abci/server/socket_server.go index 543b444b1..0bcd84542 100644 --- a/abci/server/socket_server.go +++ b/abci/server/socket_server.go @@ -233,6 +233,9 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types case *types.Request_ApplySnapshotChunk: res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk) responses <- types.ToResponseApplySnapshotChunk(res) + case *types.Request_FinalizeBlock: + res := s.app.FinalizeBlock(*r.FinalizeBlock) + responses <- types.ToResponseFinalizeBlock(res) default: responses <- types.ToResponseException("Unknown request") } diff --git a/abci/types/application.go b/abci/types/application.go index 2a3cabd8b..4fe90b473 100644 --- a/abci/types/application.go +++ b/abci/types/application.go @@ -21,7 +21,8 @@ type Application interface { BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set - Commit() ResponseCommit // Commit the state and return the application Merkle root hash + FinalizeBlock(RequestFinalizeBlock) ResponseFinalizeBlock + Commit() ResponseCommit // Commit the state and return the application Merkle root hash // State Sync Connection ListSnapshots(RequestListSnapshots) ResponseListSnapshots // List available snapshots @@ -90,6 +91,10 @@ func (BaseApplication) ApplySnapshotChunk(req RequestApplySnapshotChunk) Respons return ResponseApplySnapshotChunk{} } +func (BaseApplication) FinalizeBlock(req RequestFinalizeBlock) ResponseFinalizeBlock { + return ResponseFinalizeBlock{} +} + //------------------------------------------------------- // GRPCApplication is a GRPC wrapper for Application @@ -172,3 +177,9 @@ func (app *GRPCApplication) ApplySnapshotChunk( res := app.app.ApplySnapshotChunk(*req) return &res, nil } + +func (app *GRPCApplication) FinalizeBlock( + ctx context.Context, req *RequestFinalizeBlock) (*ResponseFinalizeBlock, error) { + res := app.app.FinalizeBlock(*req) + return &res, nil +} diff --git a/abci/types/messages.go b/abci/types/messages.go index e0605c4e5..bb5d6a176 100644 --- a/abci/types/messages.go +++ b/abci/types/messages.go @@ -114,6 +114,12 @@ func ToRequestApplySnapshotChunk(req RequestApplySnapshotChunk) *Request { } } +func ToRequestFinalizeBlock(req RequestFinalizeBlock) *Request { + return &Request{ + Value: &Request_FinalizeBlock{&req}, + } +} + //---------------------------------------- func ToResponseException(errStr string) *Response { @@ -204,3 +210,9 @@ func ToResponseApplySnapshotChunk(res ResponseApplySnapshotChunk) *Response { Value: &Response_ApplySnapshotChunk{&res}, } } + +func ToResponseFinalizeBlock(res ResponseFinalizeBlock) *Response { + return &Response{ + Value: &Response_FinalizeBlock{&res}, + } +} diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 9165bdad4..13e12292b 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -21,6 +21,7 @@ type AppConnConsensus interface { BeginBlockSync(context.Context, types.RequestBeginBlock) (*types.ResponseBeginBlock, error) DeliverTxAsync(context.Context, types.RequestDeliverTx) (*abcicli.ReqRes, error) EndBlockSync(context.Context, types.RequestEndBlock) (*types.ResponseEndBlock, error) + FinalizeBlockSync(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) CommitSync(context.Context) (*types.ResponseCommit, error) } @@ -102,6 +103,10 @@ func (app *appConnConsensus) CommitSync(ctx context.Context) (*types.ResponseCom return app.appConn.CommitSync(ctx) } +func (app *appConnConsensus) FinalizeBlockSync(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { + return app.appConn.FinalizeBlockSync(ctx, req) +} + //------------------------------------------------ // Implements AppConnMempool (subset of abcicli.Client)