diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index e5a7e5c5f..962b5698f 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -125,6 +125,7 @@ func (cli *grpcClient) OnStop() { if cli.conn != nil { cli.conn.Close() } + close(cli.chReqRes) } func (cli *grpcClient) StopForError(err error) { @@ -147,7 +148,6 @@ func (cli *grpcClient) StopForError(err error) { func (cli *grpcClient) Error() error { cli.mtx.Lock() defer cli.mtx.Unlock() - return cli.err } @@ -181,7 +181,10 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) //---------------------------------------- -func (cli *grpcClient) Flush(ctx context.Context) error { return nil } +func (cli *grpcClient) Flush(ctx context.Context) error { + _, err := cli.client.Flush(ctx, types.ToRequestFlush().GetFlush(), grpc.WaitForReady(true)) + return err +} func (cli *grpcClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { return cli.client.Echo(ctx, types.ToRequestEcho(msg).GetEcho(), grpc.WaitForReady(true)) diff --git a/abci/client/grpc_client_test.go b/abci/client/grpc_client_test.go new file mode 100644 index 000000000..7162ad7bb --- /dev/null +++ b/abci/client/grpc_client_test.go @@ -0,0 +1,80 @@ +package abcicli_test + +import ( + "fmt" + "math/rand" + "net" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "google.golang.org/grpc" + + "golang.org/x/net/context" + + "github.com/tendermint/tendermint/libs/log" + tmnet "github.com/tendermint/tendermint/libs/net" + + abciserver "github.com/tendermint/tendermint/abci/server" + "github.com/tendermint/tendermint/abci/types" +) + +func TestGRPC(t *testing.T) { + app := types.NewBaseApplication() + numCheckTxs := 2000 + socketFile := fmt.Sprintf("/tmp/test-%08x.sock", rand.Int31n(1<<30)) + defer os.Remove(socketFile) + socket := fmt.Sprintf("unix://%v", socketFile) + + // Start the listener + server := abciserver.NewGRPCServer(socket, app) + server.SetLogger(log.TestingLogger().With("module", "abci-server")) + err := server.Start() + require.NoError(t, err) + + t.Cleanup(func() { + if err := server.Stop(); err != nil { + t.Error(err) + } + }) + + // Connect to the socket + //nolint:staticcheck // SA1019 Existing use of deprecated but supported dial option. + conn, err := grpc.Dial(socket, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc)) + require.NoError(t, err) + + t.Cleanup(func() { + if err := conn.Close(); err != nil { + t.Error(err) + } + }) + + client := types.NewABCIClient(conn) + + // Write requests + for counter := 0; counter < numCheckTxs; counter++ { + // Send request + response, err := client.CheckTx(context.Background(), &types.RequestCheckTx{Tx: []byte("test")}) + require.NoError(t, err) + counter++ + if response.Code != 0 { + t.Error("CheckTx failed with ret_code", response.Code) + } + if counter > numCheckTxs { + t.Fatal("Too many CheckTx responses") + } + t.Log("response", counter) + if counter == numCheckTxs { + go func() { + time.Sleep(time.Second * 1) // Wait for a bit to allow counter overflow + }() + } + + } +} + +func dialerFunc(ctx context.Context, addr string) (net.Conn, error) { + return tmnet.Connect(addr) +} diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index c0ebb771b..2ed0f767e 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -14,10 +14,12 @@ import ( "github.com/tendermint/tendermint/abci/types" tmnet "github.com/tendermint/tendermint/libs/net" "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/libs/timer" ) const ( - reqQueueSize = 256 // TODO make configurable + reqQueueSize = 256 // TODO make configurable + flushThrottleMS = 20 // Don't wait longer than... ) // socketClient is the client side implementation of the Tendermint @@ -26,8 +28,6 @@ const ( // // This is goroutine-safe. All calls are serialized to the server through an unbuffered queue. The socketClient // tracks responses and expects them to respect the order of the requests sent. -// -// The buffer is flushed after every message sent. type socketClient struct { service.BaseService @@ -35,7 +35,8 @@ type socketClient struct { mustConnect bool conn net.Conn - reqQueue chan *ReqRes + reqQueue chan *ReqRes + flushTimer *timer.ThrottleTimer mtx sync.Mutex err error @@ -51,10 +52,12 @@ var _ Client = (*socketClient)(nil) func NewSocketClient(addr string, mustConnect bool) Client { cli := &socketClient{ reqQueue: make(chan *ReqRes, reqQueueSize), + flushTimer: timer.NewThrottleTimer("socketClient", flushThrottleMS), mustConnect: mustConnect, - addr: addr, - reqSent: list.New(), - resCb: nil, + + addr: addr, + reqSent: list.New(), + resCb: nil, } cli.BaseService = *service.NewBaseService(nil, "socketClient", cli) return cli @@ -95,6 +98,7 @@ func (cli *socketClient) OnStop() { } cli.flushQueue() + cli.flushTimer.Stop() } // Error returns an error if the client was stopped abruptly. @@ -123,26 +127,37 @@ func (cli *socketClient) CheckTxAsync(ctx context.Context, req *types.RequestChe //---------------------------------------- func (cli *socketClient) sendRequestsRoutine(conn io.Writer) { - bw := bufio.NewWriter(conn) + w := bufio.NewWriter(conn) for { select { - case <-cli.Quit(): - return case reqres := <-cli.reqQueue: // N.B. We must enqueue before sending out the request, otherwise the // server may reply before we do it, and the receiver will fail for an // unsolicited reply. cli.trackRequest(reqres) - if err := types.WriteMessage(reqres.Request, bw); err != nil { + err := types.WriteMessage(reqres.Request, w) + if err != nil { cli.stopForError(fmt.Errorf("write to buffer: %w", err)) return } - if err := bw.Flush(); err != nil { - cli.stopForError(fmt.Errorf("flush buffer: %w", err)) - return + // If it's a flush request, flush the current buffer. + if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { + err = w.Flush() + if err != nil { + cli.stopForError(fmt.Errorf("flush buffer: %w", err)) + return + } } + case <-cli.flushTimer.Ch: // flush queue + select { + case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): + default: + // Probably will fill the buffer, or retry later. + } + case <-cli.Quit(): + return } } } @@ -155,8 +170,8 @@ func (cli *socketClient) recvResponseRoutine(conn io.Reader) { } var res = &types.Response{} - - if err := types.ReadMessage(r, res); err != nil { + err := types.ReadMessage(r, res) + if err != nil { cli.stopForError(fmt.Errorf("read message: %w", err)) return } @@ -221,6 +236,182 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { return nil } +//---------------------------------------- + +func (cli *socketClient) Flush(ctx context.Context) error { + reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush()) + if err != nil { + return err + } + reqRes.Wait() + return nil +} + +func (cli *socketClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestEcho(msg)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetEcho(), cli.Error() +} + +func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestInfo(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetInfo(), cli.Error() +} + +func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestCheckTx(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetCheckTx(), cli.Error() +} + +func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestQuery(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetQuery(), cli.Error() +} + +func (cli *socketClient) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestCommit()) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetCommit(), cli.Error() +} + +func (cli *socketClient) InitChain(ctx context.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestInitChain(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetInitChain(), cli.Error() +} + +func (cli *socketClient) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestListSnapshots(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetListSnapshots(), cli.Error() +} + +func (cli *socketClient) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestOfferSnapshot(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetOfferSnapshot(), cli.Error() +} + +func (cli *socketClient) LoadSnapshotChunk(ctx context.Context, req *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestLoadSnapshotChunk(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetLoadSnapshotChunk(), cli.Error() +} + +func (cli *socketClient) ApplySnapshotChunk(ctx context.Context, req *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestApplySnapshotChunk(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetApplySnapshotChunk(), cli.Error() +} + +func (cli *socketClient) PrepareProposal(ctx context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestPrepareProposal(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetPrepareProposal(), cli.Error() +} + +func (cli *socketClient) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestProcessProposal(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetProcessProposal(), cli.Error() +} + +func (cli *socketClient) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (*types.ResponseExtendVote, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestExtendVote(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetExtendVote(), nil +} + +func (cli *socketClient) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestVerifyVoteExtension(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetVerifyVoteExtension(), nil +} + +func (cli *socketClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { + reqRes, err := cli.queueRequest(ctx, types.ToRequestFinalizeBlock(req)) + if err != nil { + return nil, err + } + if err := cli.Flush(ctx); err != nil { + return nil, err + } + return reqRes.Response.GetFinalizeBlock(), cli.Error() +} + func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) { reqres := NewReqRes(req) @@ -231,6 +422,14 @@ func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) ( return nil, ctx.Err() } + // Maybe auto-flush, or unset auto-flush + switch req.Value.(type) { + case *types.Request_Flush: + cli.flushTimer.Unset() + default: + cli.flushTimer.Set() + } + return reqres, nil } @@ -260,151 +459,6 @@ LOOP: //---------------------------------------- -func (cli *socketClient) Flush(ctx context.Context) error { - _, err := cli.queueRequest(ctx, types.ToRequestFlush()) - if err != nil { - return err - } - return nil -} - -func (cli *socketClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestEcho(msg)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetEcho(), cli.Error() -} - -func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestInfo(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetInfo(), cli.Error() -} - -func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestCheckTx(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetCheckTx(), cli.Error() -} - -func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestQuery(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetQuery(), cli.Error() -} - -func (cli *socketClient) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestCommit()) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetCommit(), cli.Error() -} - -func (cli *socketClient) InitChain(ctx context.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestInitChain(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetInitChain(), cli.Error() -} - -func (cli *socketClient) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestListSnapshots(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetListSnapshots(), cli.Error() -} - -func (cli *socketClient) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestOfferSnapshot(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetOfferSnapshot(), cli.Error() -} - -func (cli *socketClient) LoadSnapshotChunk(ctx context.Context, req *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestLoadSnapshotChunk(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetLoadSnapshotChunk(), cli.Error() -} - -func (cli *socketClient) ApplySnapshotChunk(ctx context.Context, req *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestApplySnapshotChunk(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetApplySnapshotChunk(), cli.Error() -} - -func (cli *socketClient) PrepareProposal(ctx context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestPrepareProposal(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetPrepareProposal(), cli.Error() -} - -func (cli *socketClient) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestProcessProposal(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetProcessProposal(), cli.Error() -} - -func (cli *socketClient) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (*types.ResponseExtendVote, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestExtendVote(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetExtendVote(), nil -} - -func (cli *socketClient) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestVerifyVoteExtension(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetVerifyVoteExtension(), nil -} - -func (cli *socketClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { - reqRes, err := cli.queueRequest(ctx, types.ToRequestFinalizeBlock(req)) - if err != nil { - return nil, err - } - reqRes.Wait() - return reqRes.Response.GetFinalizeBlock(), cli.Error() -} - -//---------------------------------------- - func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { switch req.Value.(type) { case *types.Request_Echo: diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index cbc0c5dd6..54757eca8 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -3,6 +3,8 @@ package abcicli_test import ( "context" "fmt" + "math/rand" + "os" "sync" "testing" "time" @@ -33,7 +35,7 @@ func TestCalls(t *testing.T) { }() select { - case <-time.After(1 * time.Second): + case <-time.After(time.Second): require.Fail(t, "No response arrived") case err, ok := <-resp: require.True(t, ok, "Must not close channel") @@ -41,6 +43,84 @@ func TestCalls(t *testing.T) { } } +func TestHangingAsyncCalls(t *testing.T) { + app := slowApp{} + + s, c := setupClientServer(t, app) + + resp := make(chan error, 1) + go func() { + // Call CheckTx + reqres, err := c.CheckTxAsync(context.Background(), &types.RequestCheckTx{}) + require.NoError(t, err) + // wait 50 ms for all events to travel socket, but + // no response yet from server + time.Sleep(50 * time.Millisecond) + // kill the server, so the connections break + err = s.Stop() + require.NoError(t, err) + + // wait for the response from CheckTx + reqres.Wait() + resp <- c.Error() + }() + + select { + case <-time.After(time.Second): + require.Fail(t, "No response arrived") + case err, ok := <-resp: + require.True(t, ok, "Must not close channel") + assert.Error(t, err, "We should get EOF error") + } +} + +func TestBulk(t *testing.T) { + const numTxs = 700000 + // use a socket instead of a port + socketFile := fmt.Sprintf("test-%08x.sock", rand.Int31n(1<<30)) + defer os.Remove(socketFile) + socket := fmt.Sprintf("unix://%v", socketFile) + app := types.NewBaseApplication() + // Start the listener + server := server.NewSocketServer(socket, app) + t.Cleanup(func() { + if err := server.Stop(); err != nil { + t.Log(err) + } + }) + err := server.Start() + require.NoError(t, err) + + // Connect to the socket + client := abcicli.NewSocketClient(socket, false) + + t.Cleanup(func() { + if err := client.Stop(); err != nil { + t.Log(err) + } + }) + + err = client.Start() + require.NoError(t, err) + + // Construct request + rfb := &types.RequestFinalizeBlock{Txs: make([][]byte, numTxs)} + for counter := 0; counter < numTxs; counter++ { + rfb.Txs[counter] = []byte("test") + } + // Send bulk request + res, err := client.FinalizeBlock(context.Background(), rfb) + require.NoError(t, err) + require.Equal(t, numTxs, len(res.TxResults), "Number of txs doesn't match") + for _, tx := range res.TxResults { + require.Equal(t, uint32(0), tx.Code, "Tx failed") + } + + // Send final flush message + err = client.Flush(context.Background()) + require.NoError(t, err) +} + func setupClientServer(t *testing.T, app types.Application) ( service.Service, abcicli.Client) { t.Helper() @@ -55,7 +135,7 @@ func setupClientServer(t *testing.T, app types.Application) ( t.Cleanup(func() { if err := s.Stop(); err != nil { - t.Error(err) + t.Log(err) } }) @@ -65,13 +145,22 @@ func setupClientServer(t *testing.T, app types.Application) ( t.Cleanup(func() { if err := c.Stop(); err != nil { - t.Error(err) + t.Log(err) } }) return s, c } +type slowApp struct { + types.BaseApplication +} + +func (slowApp) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { + time.Sleep(time.Second) + return &types.ResponseCheckTx{}, nil +} + // TestCallbackInvokedWhenSetLaet ensures that the callback is invoked when // set after the client completes the call into the app. Currently this // test relies on the callback being allowed to be invoked twice if set multiple diff --git a/abci/cmd/abci-cli/abci-cli.go b/abci/cmd/abci-cli/abci-cli.go index d1d0e4f44..11ffe2016 100644 --- a/abci/cmd/abci-cli/abci-cli.go +++ b/abci/cmd/abci-cli/abci-cli.go @@ -167,10 +167,9 @@ where example.file looks something like: check_tx 0x00 check_tx 0xff - deliver_tx 0x00 + finalize_block 0x00 check_tx 0x00 - deliver_tx 0x01 - deliver_tx 0x04 + finalize_block 0x01 0x04 0xff info `, Args: cobra.ExactArgs(0), @@ -186,7 +185,7 @@ This command opens an interactive console for running any of the other commands without opening a new connection each time `, Args: cobra.ExactArgs(0), - ValidArgs: []string{"echo", "info", "deliver_tx", "check_tx", "prepare_proposal", "process_proposal", "commit", "query"}, + ValidArgs: []string{"echo", "info", "finalize_block", "check_tx", "prepare_proposal", "process_proposal", "commit", "query"}, RunE: cmdConsole, } diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index 43928ffa8..bb8149761 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -61,7 +61,7 @@ func NewPersistentApplication(dbDir string) *Application { name := "kvstore" db, err := dbm.NewGoLevelDB(name, dbDir) if err != nil { - panic(err) + panic(fmt.Errorf("failed to create persistent app at %s: %w", dbDir, err)) } return NewApplication(db) } @@ -77,9 +77,6 @@ func NewInMemoryApplication() *Application { // Tendermint will ensure it is in sync with the application by potentially replaying the blocks it has. If the // Application returns a 0 appBlockHeight, Tendermint will call InitChain to initialize the application with consensus related data func (app *Application) Info(_ context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) { - appHash := make([]byte, 8) - binary.PutVarint(appHash, app.state.Size) - // Tendermint expects the application to persist validators, on start-up we need to reload them to memory if they exist if len(app.valAddrToPubKeyMap) == 0 && app.state.Height > 0 { validators := app.getValidators() @@ -97,7 +94,7 @@ func (app *Application) Info(_ context.Context, req *types.RequestInfo) (*types. Version: version.ABCIVersion, AppVersion: AppVersion, LastBlockHeight: app.state.Height, - LastBlockAppHash: appHash, + LastBlockAppHash: app.state.Hash(), }, nil } diff --git a/abci/server/socket_server.go b/abci/server/socket_server.go index d2982bee8..090aceed0 100644 --- a/abci/server/socket_server.go +++ b/abci/server/socket_server.go @@ -183,6 +183,7 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, resp }() for { + var req = &types.Request{} err := types.ReadMessage(bufReader, req) if err != nil { @@ -315,11 +316,12 @@ func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, res closeConn <- fmt.Errorf("error writing message: %w", err) return } - - err = bufWriter.Flush() - if err != nil { - closeConn <- fmt.Errorf("error flushing write buffer: %w", err) - return + if _, ok := res.Value.(*types.Response_Flush); ok { + err = bufWriter.Flush() + if err != nil { + closeConn <- fmt.Errorf("error flushing write buffer: %w", err) + return + } } // If the application has responded with an exception, the server returns the error diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index a21568546..207017f09 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -56,7 +56,7 @@ func newReactor( blockDB := dbm.NewMemDB() stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) blockStore := store.NewBlockStore(blockDB) diff --git a/cmd/tendermint/commands/reindex_event.go b/cmd/tendermint/commands/reindex_event.go index 65070ef60..f5a9acb4d 100644 --- a/cmd/tendermint/commands/reindex_event.go +++ b/cmd/tendermint/commands/reindex_event.go @@ -41,7 +41,7 @@ reindex from the base block height(inclusive); and the default end-height is 0, the tooling will reindex until the latest block height(inclusive). User can omit either or both arguments. -Note: This operation requires ABCI Responses. Do not set DiscardFinalizeBlockResponses to true if you +Note: This operation requires ABCI Responses. Do not set DiscardABCIResponses to true if you want to use this command. `, Example: ` diff --git a/cmd/tendermint/commands/rollback.go b/cmd/tendermint/commands/rollback.go index 8a60e96ac..c232c0b8d 100644 --- a/cmd/tendermint/commands/rollback.go +++ b/cmd/tendermint/commands/rollback.go @@ -90,7 +90,7 @@ func loadStateAndBlockStore(config *cfg.Config) (*store.BlockStore, state.Store, return nil, nil, err } stateStore := state.NewStore(stateDB, state.StoreOptions{ - DiscardFinalizeBlockResponses: config.Storage.DiscardFinalizeBlockResponses, + DiscardABCIResponses: config.Storage.DiscardABCIResponses, }) return blockStore, stateStore, nil diff --git a/config/config.go b/config/config.go index cc2491f84..c1ad2e229 100644 --- a/config/config.go +++ b/config/config.go @@ -1120,14 +1120,14 @@ type StorageConfig struct { // Set to false to ensure ABCI responses are persisted. ABCI responses are // required for `/block_results` RPC queries, and to reindex events in the // command-line tool. - DiscardFinalizeBlockResponses bool `mapstructure:"discard_abci_responses"` + DiscardABCIResponses bool `mapstructure:"discard_abci_responses"` } // DefaultStorageConfig returns the default configuration options relating to // Tendermint storage optimization. func DefaultStorageConfig() *StorageConfig { return &StorageConfig{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, } } @@ -1135,7 +1135,7 @@ func DefaultStorageConfig() *StorageConfig { // testing. func TestStorageConfig() *StorageConfig { return &StorageConfig{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, } } diff --git a/config/toml.go b/config/toml.go index a5550ff63..6041b92c3 100644 --- a/config/toml.go +++ b/config/toml.go @@ -514,7 +514,7 @@ peer_query_maj23_sleep_duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}" # considerable amount of disk space. Set to false to ensure ABCI responses are # persisted. ABCI responses are required for /block_results RPC queries, and to # reindex events in the command-line tool. -discard_abci_responses = {{ .Storage.DiscardFinalizeBlockResponses}} +discard_abci_responses = {{ .Storage.DiscardABCIResponses}} ####################################################### ### Transaction Indexer Configuration Options ### diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 16768a15d..253726785 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -52,7 +52,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { logger := consensusLogger().With("test", "byzantine", "validator", i) stateDB := dbm.NewMemDB() // each state needs its own db stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) cfg := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) defer os.RemoveAll(cfg.RootDir) diff --git a/consensus/common_test.go b/consensus/common_test.go index 0f2598e37..194fd85c4 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -43,7 +43,7 @@ const ( testSubscriber = "test-client" // genesis, chain_id, priv_val - ensureTimeout = time.Millisecond * 200 + ensureTimeout = time.Millisecond * 500 ) func ensureDir(dir string, mode os.FileMode) { @@ -481,7 +481,7 @@ func newState( // Make State stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) err := stateStore.Save(state) // for save height 1's validators info diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index e16b0eca0..309f429d8 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -2,7 +2,6 @@ package consensus import ( "context" - "encoding/binary" "fmt" "os" "testing" @@ -119,7 +118,7 @@ func deliverTxsRange(t *testing.T, cs *State, start, end int) { func TestMempoolTxConcurrentWithCommit(t *testing.T) { cfg := ResetConfig(t.Name()) state, privVals := makeGenesisState(t, genesisStateArgs{validators: 1}) - stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{DiscardABCIResponses: false}) cs := newState(t, cfg, state, privVals[0], kvstore.NewInMemoryApplication()) err := stateStore.Save(state) require.NoError(t, err) @@ -145,29 +144,31 @@ func TestMempoolRmBadTx(t *testing.T) { state, privVals := makeGenesisState(t, genesisStateArgs{validators: 1}) app := kvstore.NewInMemoryApplication() blockDB := dbm.NewMemDB() - stateStore := sm.NewStore(blockDB, sm.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore := sm.NewStore(blockDB, sm.StoreOptions{DiscardABCIResponses: false}) cs := newState(t, cfg, state, privVals[0], app) err := stateStore.Save(state) require.NoError(t, err) // increment the counter by 1 - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(0)) - - res, err := app.FinalizeBlock(context.Background(), &abci.RequestFinalizeBlock{Txs: [][]byte{kvstore.NewTx("key", "value")}}) + txBytes := kvstore.NewTx("key", "value") + res, err := app.FinalizeBlock(context.Background(), &abci.RequestFinalizeBlock{Txs: [][]byte{txBytes}}) require.NoError(t, err) assert.False(t, res.TxResults[0].IsErr()) assert.True(t, len(res.AgreedAppData) > 0) + _, err = app.Commit(context.Background(), &abci.RequestCommit{}) + require.NoError(t, err) + emptyMempoolCh := make(chan struct{}) checkTxRespCh := make(chan struct{}) go func() { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool - err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.ResponseCheckTx) { + invalidTx := []byte("invalidTx") + err := assertMempool(cs.txNotifier).CheckTx(invalidTx, func(r *abci.ResponseCheckTx) { if r.Code != kvstore.CodeTypeInvalidTxFormat { - t.Errorf("expected checktx to return bad nonce, got %v", r) + t.Errorf("expected checktx to return invalid format, got %v", r) return } checkTxRespCh <- struct{}{} @@ -179,7 +180,7 @@ func TestMempoolRmBadTx(t *testing.T) { // check for the tx for { - txs := assertMempool(cs.txNotifier).ReapMaxBytesMaxGas(int64(len(txBytes)), -1) + txs := assertMempool(cs.txNotifier).ReapMaxBytesMaxGas(int64(len(invalidTx)), -1) if len(txs) == 0 { emptyMempoolCh <- struct{}{} return diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index cbc849f65..808738d8b 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -142,7 +142,7 @@ func TestReactorWithEvidence(t *testing.T) { for i := 0; i < nValidators; i++ { stateDB := dbm.NewMemDB() // each state needs its own db stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) cfg := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) defer os.RemoveAll(cfg.RootDir) diff --git a/consensus/replay.go b/consensus/replay.go index 450c1101d..ea9b2ff46 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -266,7 +266,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { } // Replay blocks up to the latest in the blockstore. - _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) + appHash, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) if err != nil { return fmt.Errorf("error on replay: %v", err) } @@ -423,6 +423,13 @@ func (h *Handshaker) ReplayBlocks( if err != nil { return nil, err } + // NOTE: There is a rare edge case where a node has upgraded from + // v0.37 with endblock to v0.38 with finalize block and thus + // does not have the app hash saved from the previous height + // here we take the appHash provided from the Info handshake + if len(finalizeBlockResponse.AgreedAppData) == 0 { + finalizeBlockResponse.AgreedAppData = appHash + } mockApp := newMockProxyApp(finalizeBlockResponse) h.logger.Info("Replay last block using mock app") state, err = h.replayBlock(state, storeBlockHeight, mockApp) diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 905f363a0..c342c32bd 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -298,7 +298,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo tmos.Exit(err.Error()) } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) gdoc, err := sm.MakeGenesisDocFromFile(config.GenesisFile()) if err != nil { diff --git a/consensus/replay_test.go b/consensus/replay_test.go index e94a15674..da8b30e16 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -20,8 +20,9 @@ import ( "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/abci/types/mocks" - "github.com/tendermint/tendermint/config" + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" + cryptoenc "github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/internal/test" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" @@ -30,6 +31,7 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" + smmocks "github.com/tendermint/tendermint/state/mocks" "github.com/tendermint/tendermint/types" ) @@ -48,8 +50,8 @@ import ( // and which ones we need the wal for - then we'd also be able to only flush the // wal writer when we need to, instead of with every message. -func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *config.Config, - lastBlockHeight int64, stateStore sm.Store) { +func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config, + lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) { logger := log.TestingLogger() state, _ := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile()) privValidator := loadPrivValidator(consensusReplayConfig) @@ -88,18 +90,14 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *config.Co } } -func sendTxs(ctx context.Context, cs *State) { +func sendTxs(ctx context.Context, t *testing.T, cs *State) { for i := 0; i < 256; i++ { select { case <-ctx.Done(): return default: tx := kvstore.NewTxFromID(i) - if err := assertMempool(cs.txNotifier).CheckTx(tx, func(resp *abci.ResponseCheckTx) { - if resp.Code != 0 { - panic(fmt.Sprintf("Unexpected code: %d, log: %s", resp.Code, resp.Log)) - } - }, mempool.TxInfo{}); err != nil { + if err := assertMempool(cs.txNotifier).CheckTx(tx, assertValidTx(t), mempool.TxInfo{}); err != nil { panic(err) } i++ @@ -119,7 +117,7 @@ func TestWALCrash(t *testing.T) { 1}, {"many non-empty blocks", func(stateDB dbm.DB, cs *State, ctx context.Context) { - go sendTxs(ctx, cs) + go sendTxs(ctx, t, cs) }, 3}, } @@ -133,7 +131,7 @@ func TestWALCrash(t *testing.T) { } } -func crashWALandCheckLiveness(t *testing.T, cfg *config.Config, +func crashWALandCheckLiveness(t *testing.T, consensusReplayConfig *cfg.Config, initFn func(dbm.DB, *State, context.Context), heightToStop int64) { walPanicked := make(chan error) crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop} @@ -145,16 +143,17 @@ LOOP: // create consensus state from a clean slate logger := log.NewNopLogger() - stateDB := dbm.NewMemDB() + blockDB := dbm.NewMemDB() + stateDB := blockDB stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) - state, err := sm.MakeGenesisStateFromFile(cfg.GenesisFile()) + state, err := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile()) require.NoError(t, err) - privValidator := loadPrivValidator(cfg) + privValidator := loadPrivValidator(consensusReplayConfig) cs := newState( t, - cfg, + consensusReplayConfig, state, privValidator, kvstore.NewInMemoryApplication(), @@ -189,7 +188,7 @@ LOOP: t.Logf("WAL panicked: %v", err) // make sure we can make blocks after a crash - startNewStateAndWaitForBlock(t, cfg, cs.Height, stateStore) + startNewStateAndWaitForBlock(t, consensusReplayConfig, cs.Height, blockDB, stateStore) // stop consensus state and transactions sender (initFn) cs.Stop() //nolint:errcheck // Logging this error causes failure @@ -287,36 +286,251 @@ const numBlocks = 6 // Test handshake/replay // 0 - all synced up -// 1 - saved block but app and state are behind -// 2 - save block and committed but state is behind -// 3 - save block and committed with truncated block store and state behind +// 1 - saved block but app and state are behind by one height +// 2 - save block and committed (i.e. app got `Commit`) but state is behind +// 3 - same as 2 but with a truncated block store var modes = []uint{0, 1, 2, 3} +// Build a chain containing validator set changes +func setupChainWithChangingValidators(t *testing.T, config *cfg.Config, nBlocks int) ([]*types.Block, []*types.ExtendedCommit, sm.State) { + nPeers := 7 + nVals := 4 + css, _, _ := makeNetwork(t, makeNetworkArgs{ + config: config, + validators: nVals, + nonValidators: nPeers - nVals, + }) + genesisState := css[0].GetState() + + partSize := types.BlockPartSizeBytes + + newRoundCh := subscribe(css[0].eventBus, types.EventQueryNewRound) + proposalCh := subscribe(css[0].eventBus, types.EventQueryCompleteProposal) + + vss := make([]*validatorStub, nPeers) + for i := 0; i < nPeers; i++ { + vss[i] = newValidatorStub(css[i].privValidator, int32(i)) + } + height, round := css[0].Height, css[0].Round + + // start the machine + startTestRound(css[0], height, round) + incrementHeight(vss...) + ensureNewRound(newRoundCh, height, 0) + ensureNewProposal(proposalCh, height, round) + rs := css[0].GetRoundState() + blockID := types.NewBlockID(rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) + signAddVotes(t, css[0], tmproto.PrecommitType, blockID, vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 2 + height++ + incrementHeight(vss...) + newValidatorPubKey1, err := css[nVals].privValidator.GetPubKey() + require.NoError(t, err) + valPubKey1ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey1) + require.NoError(t, err) + newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, assertValidTx(t), mempool.TxInfo{}) + assert.NoError(t, err) + propBlock, err := css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + require.NoError(t, err) + propBlockParts, err := propBlock.MakePartSet(partSize) + require.NoError(t, err) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + + proposal := types.NewProposal(vss[1].Height, round, -1, blockID) + p := proposal.ToProto() + if err := vss[1].SignProposal(test.DefaultTestChainID, p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + signAddVotes(t, css[0], tmproto.PrecommitType, blockID, vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 3 + height++ + incrementHeight(vss...) + updateValidatorPubKey1, err := css[nVals].privValidator.GetPubKey() + require.NoError(t, err) + updatePubKey1ABCI, err := cryptoenc.PubKeyToProto(updateValidatorPubKey1) + require.NoError(t, err) + updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) + err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, assertValidTx(t), mempool.TxInfo{}) + assert.NoError(t, err) + propBlock, err = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + require.NoError(t, err) + propBlockParts, err = propBlock.MakePartSet(partSize) + require.NoError(t, err) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + + proposal = types.NewProposal(vss[2].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[2].SignProposal(test.DefaultTestChainID, p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + signAddVotes(t, css[0], tmproto.PrecommitType, blockID, vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 4 + height++ + incrementHeight(vss...) + newValidatorPubKey2, err := css[nVals+1].privValidator.GetPubKey() + require.NoError(t, err) + newVal2ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey2) + require.NoError(t, err) + newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, assertValidTx(t), mempool.TxInfo{}) + assert.Nil(t, err) + newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey() + require.NoError(t, err) + newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3) + require.NoError(t, err) + newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, assertValidTx(t), mempool.TxInfo{}) + assert.NoError(t, err) + propBlock, err = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + require.NoError(t, err) + propBlockParts, err = propBlock.MakePartSet(partSize) + require.NoError(t, err) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + + proposal = types.NewProposal(vss[3].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[3].SignProposal(test.DefaultTestChainID, p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + signAddVotes(t, css[0], tmproto.PrecommitType, blockID, vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 5 + height++ + incrementHeight(vss...) + removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0) + err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, assertValidTx(t), mempool.TxInfo{}) + require.NoError(t, err) + propBlock, err = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + require.NoError(t, err) + propBlockParts, err = propBlock.MakePartSet(partSize) + require.NoError(t, err) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + + proposal = types.NewProposal(vss[4].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[4].SignProposal(test.DefaultTestChainID, p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + signAddVotes(t, css[0], tmproto.PrecommitType, blockID, vss[1:nVals+2]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 6 + height++ + incrementHeight(vss...) + removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) + err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, assertValidTx(t), mempool.TxInfo{}) + assert.NoError(t, err) + propBlock, err = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + require.NoError(t, err) + propBlockParts, err = propBlock.MakePartSet(partSize) + require.NoError(t, err) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + + proposal = types.NewProposal(vss[1].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[1].SignProposal(test.DefaultTestChainID, p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + signAddVotes(t, css[0], tmproto.PrecommitType, blockID, vss[1:nVals+1]...) + ensureNewRound(newRoundCh, height+1, 0) + + chain := make([]*types.Block, 0) + extCommits := make([]*types.ExtendedCommit, 0) + for i := 1; i <= nBlocks; i++ { + chain = append(chain, css[0].blockStore.LoadBlock(int64(i))) + extCommits = append(extCommits, css[0].blockStore.LoadBlockExtendedCommit(int64(i))) + } + return chain, extCommits, genesisState +} + // Sync from scratch func TestHandshakeReplayAll(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, 0, m) + t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) { + testHandshakeReplay(t, 0, m, false) + }) + t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) { + testHandshakeReplay(t, 0, m, false) + }) } } // Sync many, not from scratch func TestHandshakeReplaySome(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, 2, m) + t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) { + testHandshakeReplay(t, 2, m, false) + }) + t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) { + testHandshakeReplay(t, 2, m, true) + }) } } // Sync from lagging by one func TestHandshakeReplayOne(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, numBlocks-1, m) + t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) { + testHandshakeReplay(t, numBlocks-1, m, false) + }) + t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) { + testHandshakeReplay(t, numBlocks-1, m, true) + }) } } // Sync from caught up func TestHandshakeReplayNone(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, numBlocks, m) + t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) { + testHandshakeReplay(t, numBlocks, m, false) + }) + t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) { + testHandshakeReplay(t, numBlocks, m, true) + }) } } @@ -337,8 +551,9 @@ func tempWALWithData(data []byte) string { // Make some blocks. Start a fresh app and apply nBlocks blocks. // Then restart the app and sync it up with the remaining blocks -func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { +func testHandshakeReplay(t *testing.T, nBlocks int, mode uint, testValidatorsChange bool) { var ( + testConfig *cfg.Config chain []*types.Block extCommits []*types.ExtendedCommit store *mockBlockStore @@ -348,46 +563,56 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { evpool = sm.EmptyEvidencePool{} ) - testConfig := ResetConfig(fmt.Sprintf("handhsake_%d_%d", nBlocks, mode)) - t.Cleanup(func() { - _ = os.RemoveAll(testConfig.RootDir) - }) - walBody, err := WALWithNBlocks(t, numBlocks) - require.NoError(t, err) - walFile := tempWALWithData(walBody) - testConfig.Consensus.SetWalFile(walFile) + if testValidatorsChange { + testConfig = ResetConfig(fmt.Sprintf("%d_%d_m", nBlocks, mode)) + t.Cleanup(func() { + _ = os.RemoveAll(testConfig.RootDir) + }) + chain, extCommits, genesisState = setupChainWithChangingValidators(t, testConfig, numBlocks) + stateDB = dbm.NewMemDB() + store = newMockBlockStore(t, testConfig, genesisState.ConsensusParams) + } else { + testConfig = ResetConfig(fmt.Sprintf("%d_%d_s", nBlocks, mode)) + t.Cleanup(func() { + _ = os.RemoveAll(testConfig.RootDir) + }) + walBody, err := WALWithNBlocks(t, numBlocks, testConfig) + require.NoError(t, err) + walFile := tempWALWithData(walBody) + testConfig.Consensus.SetWalFile(walFile) - privVal := privval.LoadFilePV(testConfig.PrivValidatorKeyFile(), testConfig.PrivValidatorStateFile()) + privVal := privval.LoadFilePV(testConfig.PrivValidatorKeyFile(), testConfig.PrivValidatorStateFile()) - wal, err := NewWAL(walFile) - require.NoError(t, err) - wal.SetLogger(log.TestingLogger()) - err = wal.Start() - require.NoError(t, err) - t.Cleanup(func() { - if err := wal.Stop(); err != nil { - t.Error(err) - } - }) - chain, extCommits, err = makeBlockchainFromWAL(wal) - require.NoError(t, err) - pubKey, err := privVal.GetPubKey() - require.NoError(t, err) - stateDB, genesisState, store = stateAndStore(t, testConfig, pubKey, kvstore.AppVersion) + wal, err := NewWAL(walFile) + require.NoError(t, err) + wal.SetLogger(log.TestingLogger()) + err = wal.Start() + require.NoError(t, err) + t.Cleanup(func() { + if err := wal.Stop(); err != nil { + t.Error(err) + } + }) + chain, extCommits, err = makeBlockchainFromWAL(wal) + require.NoError(t, err) + pubKey, err := privVal.GetPubKey() + require.NoError(t, err) + stateDB, genesisState, store = stateAndStore(t, testConfig, pubKey, kvstore.AppVersion) + } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) t.Cleanup(func() { _ = stateStore.Close() }) store.chain = chain + store.config = testConfig store.extCommits = extCommits state := genesisState.Copy() // run the chain through state.ApplyBlock to build up the tendermint state - state = buildTMStateFromChain(t, testConfig, stateStore, mempool, evpool, state, chain, nBlocks, mode, store) - latestAppHash := state.AppHash + state, latestAppHash := buildTMStateFromChain(t, testConfig, stateStore, mempool, evpool, state, chain, nBlocks, mode, store) // make a new client creator kvstoreApp := kvstore.NewPersistentApplication( @@ -402,12 +627,12 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { // use a throwaway tendermint state proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics()) stateDB1 := dbm.NewMemDB() - stateStore := sm.NewStore(stateDB1, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + dummyStateStore := sm.NewStore(stateDB1, sm.StoreOptions{ + DiscardABCIResponses: false, }) - err := stateStore.Save(genesisState) + err := dummyStateStore.Save(genesisState) require.NoError(t, err) - buildAppStateFromChain(t, proxyApp, stateStore, mempool, evpool, genesisState, chain, nBlocks, mode, store) + buildAppStateFromChain(t, proxyApp, dummyStateStore, mempool, evpool, genesisState, chain, nBlocks, mode, store) } // Prune block store if requested @@ -420,7 +645,8 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { } // now start the app using the handshake - it should sync - genDoc, _ := sm.MakeGenesisDocFromFile(testConfig.GenesisFile()) + genDoc, err := sm.MakeGenesisDocFromFile(testConfig.GenesisFile()) + require.NoError(t, err) handshaker := NewHandshaker(stateStore, state, store, genDoc) proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics()) if err := proxyApp.Start(); err != nil { @@ -433,19 +659,27 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { } }) + // perform the replay protocol to sync Tendermint and the application err = handshaker.Handshake(proxyApp) if expectError { require.Error(t, err) + // finish the test early return - } else if err != nil { - t.Fatalf("Error on abci handshake: %v", err) } + require.NoError(t, err) // get the latest app hash from the app - res, err := proxyApp.Query().Info(context.Background(), &abci.RequestInfo{Version: ""}) - if err != nil { - t.Fatal(err) - } + res, err := proxyApp.Query().Info(context.Background(), proxy.RequestInfo) + require.NoError(t, err) + + // block store and app height should be in sync + require.Equal(t, store.Height(), res.LastBlockHeight) + + // tendermint state height and app height should be in sync + state, err = stateStore.Load() + require.NoError(t, err) + require.Equal(t, state.LastBlockHeight, res.LastBlockHeight) + require.Equal(t, int64(numBlocks), res.LastBlockHeight) // the app hash should be synced up if !bytes.Equal(latestAppHash, res.LastBlockAppHash) { @@ -509,9 +743,12 @@ func buildAppStateFromChain(t *testing.T, proxyApp proxy.AppConns, stateStore sm state = applyBlock(t, stateStore, mempool, evpool, state, block, proxyApp, bs) } + // mode 1 only the block at the last height is saved + // mode 2 and 3, the block is saved, commit is called, but the state is not saved if mode == 2 || mode == 3 { // update the kvstore height and apphash // as if we ran commit but not + // here we expect a dummy state store to be used state = applyBlock(t, stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, bs) } default: @@ -522,7 +759,7 @@ func buildAppStateFromChain(t *testing.T, proxyApp proxy.AppConns, stateStore sm func buildTMStateFromChain( t *testing.T, - cfg *config.Config, + config *cfg.Config, stateStore sm.Store, mempool mempool.Mempool, evpool sm.EvidencePool, @@ -530,11 +767,11 @@ func buildTMStateFromChain( chain []*types.Block, nBlocks int, mode uint, - bs sm.BlockStore) sm.State { + bs sm.BlockStore) (sm.State, []byte) { // run the whole chain against this client to build up the tendermint state clientCreator := proxy.NewLocalClientCreator( kvstore.NewPersistentApplication( - filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))) + filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))) proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics()) if err := proxyApp.Start(); err != nil { panic(err) @@ -557,6 +794,7 @@ func buildTMStateFromChain( for _, block := range chain { state = applyBlock(t, stateStore, mempool, evpool, state, block, proxyApp, bs) } + return state, state.AppHash case 1, 2, 3: // sync up to the penultimate as if we stored the block. @@ -565,14 +803,24 @@ func buildTMStateFromChain( state = applyBlock(t, stateStore, mempool, evpool, state, block, proxyApp, bs) } + dummyStateStore := &smmocks.Store{} + lastHeight := int64(len(chain)) + penultimateHeight := int64(len(chain) - 1) + vals, _ := stateStore.LoadValidators(penultimateHeight) + dummyStateStore.On("LoadValidators", penultimateHeight).Return(vals, nil) + dummyStateStore.On("Save", mock.Anything).Return(nil) + dummyStateStore.On("SaveFinalizeBlockResponse", lastHeight, mock.MatchedBy(func(response *abci.ResponseFinalizeBlock) bool { + require.NoError(t, stateStore.SaveFinalizeBlockResponse(lastHeight, response)) + return true + })).Return(nil) + // apply the final block to a state copy so we can // get the right next appHash but keep the state back - applyBlock(t, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, bs) + s := applyBlock(t, dummyStateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, bs) + return state, s.AppHash default: panic(fmt.Sprintf("unknown mode %v", mode)) } - - return state } func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { @@ -588,7 +836,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { require.NoError(t, err) stateDB, state, store := stateAndStore(t, config, pubKey, appVersion) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) state.LastValidators = state.Validators.Copy() @@ -647,6 +895,12 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { } } +func assertValidTx(t *testing.T) func(resp *abci.ResponseCheckTx) { + return func(resp *abci.ResponseCheckTx) { + require.EqualValues(t, 0, resp.Code) + } +} + type badApp struct { abci.BaseApplication numBlocks byte @@ -803,18 +1057,18 @@ func readPieceFromWAL(msg *TimedWALMessage) interface{} { // fresh state and mock store func stateAndStore( t *testing.T, - cfg *config.Config, + config *cfg.Config, pubKey crypto.PubKey, appVersion uint64, ) (dbm.DB, sm.State, *mockBlockStore) { stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) - state, err := sm.MakeGenesisStateFromFile(cfg.GenesisFile()) + state, err := sm.MakeGenesisStateFromFile(config.GenesisFile()) require.NoError(t, err) state.Version.Consensus.App = appVersion - store := newMockBlockStore(t, cfg, state.ConsensusParams) + store := newMockBlockStore(t, config, state.ConsensusParams) require.NoError(t, stateStore.Save(state)) return stateDB, state, store @@ -823,8 +1077,10 @@ func stateAndStore( //---------------------------------- // mock block store +var _ sm.BlockStore = &mockBlockStore{} + type mockBlockStore struct { - cfg *config.Config + config *cfg.Config params types.ConsensusParams chain []*types.Block extCommits []*types.ExtendedCommit @@ -833,9 +1089,9 @@ type mockBlockStore struct { } // TODO: NewBlockStore(db.NewMemDB) ... -func newMockBlockStore(t *testing.T, cfg *config.Config, params types.ConsensusParams) *mockBlockStore { +func newMockBlockStore(t *testing.T, config *cfg.Config, params types.ConsensusParams) *mockBlockStore { return &mockBlockStore{ - cfg: cfg, + config: config, params: params, t: t, } @@ -910,7 +1166,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) { require.NoError(t, err) stateDB, state, store := stateAndStore(t, config, pubKey, 0x0) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) oldValAddr := state.Validators.Validators[0].Address diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index a84ddc0b6..4501bb42d 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -28,9 +28,7 @@ import ( // persistent kvstore application and special consensus wal instance // (byteBufferWAL) and waits until numBlocks are created. // If the node fails to produce given numBlocks, it returns an error. -func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { - config := getConfig(t) - +func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int, config *cfg.Config) (err error) { app := kvstore.NewPersistentApplication(filepath.Join(config.DBDir(), "wal_generator")) logger := log.TestingLogger().With("wal_generator", "wal_generator") @@ -49,7 +47,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { blockStoreDB := db.NewMemDB() stateDB := blockStoreDB stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := sm.MakeGenesisState(genDoc) if err != nil { @@ -123,11 +121,11 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { } // WALWithNBlocks returns a WAL content with numBlocks. -func WALWithNBlocks(t *testing.T, numBlocks int) (data []byte, err error) { +func WALWithNBlocks(t *testing.T, numBlocks int, config *cfg.Config) (data []byte, err error) { var b bytes.Buffer wr := bufio.NewWriter(&b) - if err := WALGenerateNBlocks(t, wr, numBlocks); err != nil { + if err := WALGenerateNBlocks(t, wr, numBlocks, config); err != nil { return []byte{}, err } diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 12b775b41..80367c000 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -56,7 +56,7 @@ func TestWALTruncate(t *testing.T) { // 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), // when headBuf is full, truncate content will Flush to the file. at this // time, RotateFile is called, truncate content exist in each file. - err = WALGenerateNBlocks(t, wal.Group(), 60) + err = WALGenerateNBlocks(t, wal.Group(), 60, getConfig(t)) require.NoError(t, err) time.Sleep(1 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run @@ -150,7 +150,7 @@ func TestWALWrite(t *testing.T) { } func TestWALSearchForEndHeight(t *testing.T) { - walBody, err := WALWithNBlocks(t, 6) + walBody, err := WALWithNBlocks(t, 6, getConfig(t)) if err != nil { t.Fatal(err) } @@ -188,7 +188,7 @@ func TestWALPeriodicSync(t *testing.T) { wal.SetLogger(log.TestingLogger()) // Generate some data - err = WALGenerateNBlocks(t, wal.Group(), 5) + err = WALGenerateNBlocks(t, wal.Group(), 5, getConfig(t)) require.NoError(t, err) // We should have data in the buffer now diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 3b5c0626e..8283bb2f9 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -359,7 +359,7 @@ func TestRecoverPendingEvidence(t *testing.T) { func initializeStateFromValidatorSet(valSet *types.ValidatorSet, height int64) sm.Store { stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state := sm.State{ ChainID: evidenceChainID, diff --git a/light/detector.go b/light/detector.go index 1fd21f41e..dd56d9428 100644 --- a/light/detector.go +++ b/light/detector.go @@ -34,7 +34,7 @@ func (c *Client) detectDivergence(ctx context.Context, primaryTrace []*types.Lig lastVerifiedHeader = primaryTrace[len(primaryTrace)-1].SignedHeader witnessesToRemove = make([]int, 0) ) - c.logger.Debug("Running detector against trace", "endBlockHeight", lastVerifiedHeader.Height, + c.logger.Debug("Running detector against trace", "finalizeBlockHeight", lastVerifiedHeader.Height, "endBlockHash", lastVerifiedHeader.Hash, "length", len(primaryTrace)) c.providerMutex.Lock() diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index f1e323c79..9fb23c52a 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -664,9 +664,9 @@ func (mem *CListMempool) recheckTxs() { } } - if err := mem.proxyAppConn.Flush(context.TODO()); err != nil { - mem.logger.Error("recheckTx flush", err, "err") - } + // In encode(height) -// BeginBlock events: encode(eventType.eventAttr|eventValue|height|begin_block) => encode(height) -// EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height) +// FinalizeBlock events: encode(eventType.eventAttr|eventValue|height|finalize_block) => encode(height) func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockEvents) error { batch := idx.store.NewBatch() defer batch.Close() @@ -66,15 +65,15 @@ func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockEvents) error { } // 2. index block events - if err := idx.indexEvents(batch, bh.Events, "begin_block", height); err != nil { - return fmt.Errorf("failed to index BeginBlock events: %w", err) + if err := idx.indexEvents(batch, bh.Events, "finalize_block", height); err != nil { + return fmt.Errorf("failed to index FinalizeBlock events: %w", err) } return batch.WriteSync() } -// Search performs a query for block heights that match a given BeginBlock -// and Endblock event search criteria. The given query can match against zero, +// Search performs a query for block heights that match a given FinalizeBlock +// event search criteria. The given query can match against zero, // one or more block heights. In the case of height queries, i.e. block.height=H, // if the height is indexed, that height alone will be returned. An error and // nil slice is returned. Otherwise, a non-nil slice and nil error is returned. diff --git a/state/indexer/sink/psql/backport.go b/state/indexer/sink/psql/backport.go index ce4a137db..68efa2aae 100644 --- a/state/indexer/sink/psql/backport.go +++ b/state/indexer/sink/psql/backport.go @@ -24,8 +24,7 @@ import ( ) const ( - eventTypeBeginBlock = "begin_block" - eventTypeEndBlock = "end_block" + eventTypeFinalizeBlock = "finaliz_block" ) // TxIndexer returns a bridge from es to the Tendermint v0.34 transaction indexer. diff --git a/state/indexer/sink/psql/psql.go b/state/indexer/sink/psql/psql.go index 0c71541e5..a2417c8ac 100644 --- a/state/indexer/sink/psql/psql.go +++ b/state/indexer/sink/psql/psql.go @@ -165,7 +165,7 @@ INSERT INTO `+tableBlocks+` (height, chain_id, created_at) } // Insert all the block events. Order is important here, if err := insertEvents(dbtx, blockID, 0, h.Events); err != nil { - return fmt.Errorf("begin-block events: %w", err) + return fmt.Errorf("finalizeblock events: %w", err) } return nil }) diff --git a/state/indexer/sink/psql/psql_test.go b/state/indexer/sink/psql/psql_test.go index 2ee777611..b08c2dbf3 100644 --- a/state/indexer/sink/psql/psql_test.go +++ b/state/indexer/sink/psql/psql_test.go @@ -351,17 +351,8 @@ SELECT height FROM `+tableBlocks+` WHERE height = $1; if err := testDB().QueryRow(` SELECT type, height, chain_id FROM `+viewBlockEvents+` WHERE height = $1 AND type = $2 AND chain_id = $3; -`, height, eventTypeBeginBlock, chainID).Err(); err == sql.ErrNoRows { - t.Errorf("No %q event found for height=%d", eventTypeBeginBlock, height) - } else if err != nil { - t.Fatalf("Database query failed: %v", err) - } - - if err := testDB().QueryRow(` -SELECT type, height, chain_id FROM `+viewBlockEvents+` - WHERE height = $1 AND type = $2 AND chain_id = $3; -`, height, eventTypeEndBlock, chainID).Err(); err == sql.ErrNoRows { - t.Errorf("No %q event found for height=%d", eventTypeEndBlock, height) +`, height, eventTypeFinalizeBlock, chainID).Err(); err == sql.ErrNoRows { + t.Errorf("No %q event found for height=%d", eventTypeFinalizeBlock, height) } else if err != nil { t.Fatalf("Database query failed: %v", err) } diff --git a/state/metrics.gen.go b/state/metrics.gen.go index 1ce2c4de1..512bb7a37 100644 --- a/state/metrics.gen.go +++ b/state/metrics.gen.go @@ -18,7 +18,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Namespace: namespace, Subsystem: MetricsSubsystem, Name: "block_processing_time", - Help: "Time between BeginBlock and EndBlock in ms.", + Help: "Time spent processig finalize block.", Buckets: stdprometheus.LinearBuckets(1, 10, 10), }, labels).With(labelsAndValues...), diff --git a/state/metrics.go b/state/metrics.go index 6c238df76..a37515668 100644 --- a/state/metrics.go +++ b/state/metrics.go @@ -14,7 +14,7 @@ const ( // Metrics contains metrics exposed by this package. type Metrics struct { - // Time between BeginBlock and EndBlock in ms. + // Time spent processing FinalizeBlock BlockProcessingTime metrics.Histogram `metrics_buckettype:"lin" metrics_bucketsizes:"1, 10, 10"` // ConsensusParamUpdates is the total number of times the application has diff --git a/state/rollback_test.go b/state/rollback_test.go index 08a7ea9c3..9e2d03efc 100644 --- a/state/rollback_test.go +++ b/state/rollback_test.go @@ -88,7 +88,7 @@ func TestRollback(t *testing.T) { func TestRollbackHard(t *testing.T) { const height int64 = 100 blockStore := store.NewBlockStore(dbm.NewMemDB()) - stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{DiscardABCIResponses: false}) valSet, _ := types.RandValidatorSet(5, 10) @@ -204,7 +204,7 @@ func TestRollbackHard(t *testing.T) { func TestRollbackNoState(t *testing.T) { stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) blockStore := &mocks.BlockStore{} @@ -238,7 +238,7 @@ func TestRollbackDifferentStateHeight(t *testing.T) { } func setupStateStore(t *testing.T, height int64) state.Store { - stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{DiscardABCIResponses: false}) valSet, _ := types.RandValidatorSet(5, 10) params := types.DefaultConsensusParams() diff --git a/state/state.go b/state/state.go index 51ce5a3f8..171a4e713 100644 --- a/state/state.go +++ b/state/state.go @@ -68,7 +68,7 @@ type State struct { LastHeightValidatorsChanged int64 // Consensus parameters used for validating blocks. - // Changes returned by EndBlock and updated after Commit. + // Changes returned by FinalizeBlock and updated after Commit. ConsensusParams types.ConsensusParams LastHeightConsensusParamsChanged int64 diff --git a/state/state_test.go b/state/state_test.go index dc14b6bbb..d8ddf26d1 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -28,7 +28,7 @@ func setupTestCase(t *testing.T) (func(t *testing.T), dbm.DB, sm.State) { dbType := dbm.BackendType(config.DBBackend) stateDB, err := dbm.NewDB("state", dbType, config.DBDir()) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) require.NoError(t, err) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) @@ -77,7 +77,7 @@ func TestStateSaveLoad(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) assert := assert.New(t) @@ -98,7 +98,7 @@ func TestFinalizeBlockResponsesSaveLoad1(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) assert := assert.New(t) @@ -131,7 +131,7 @@ func TestFinalizeBlockResponsesSaveLoad2(t *testing.T) { assert := assert.New(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) cases := [...]struct { @@ -219,7 +219,7 @@ func TestValidatorSimpleSaveLoad(t *testing.T) { assert := assert.New(t) statestore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) // Can't load anything for height 0. @@ -254,7 +254,7 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) // Change vals at these heights. @@ -912,7 +912,7 @@ func TestStoreLoadValidatorsIncrementsProposerPriority(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) t.Cleanup(func() { tearDown(t) }) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state.Validators = genValSet(valSetSize) state.NextValidators = state.Validators.CopyIncrementProposerPriority(1) @@ -939,7 +939,7 @@ func TestManyValidatorChangesSaveLoad(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) require.Equal(t, int64(0), state.LastBlockHeight) state.Validators = genValSet(valSetSize) @@ -1005,7 +1005,7 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) { defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) // Change vals at these heights. diff --git a/state/store.go b/state/store.go index bcb293d45..f2da03ec3 100644 --- a/state/store.go +++ b/state/store.go @@ -84,11 +84,11 @@ type dbStore struct { } type StoreOptions struct { - // DiscardFinalizeBlockResponses determines whether or not the store - // retains all ABCIResponses. If DiscardFinalizeBlockResponses is enabled, + // DiscardABCIResponses determines whether or not the store + // retains all ABCIResponses. If DiscardABCIResponses is enabled, // the store will maintain only the response object from the latest // height. - DiscardFinalizeBlockResponses bool + DiscardABCIResponses bool } var _ Store = (*dbStore)(nil) @@ -375,11 +375,11 @@ func TxResultsHash(txResults []*abci.ExecTxResult) []byte { return types.NewResults(txResults).Hash() } -// LoadFinalizeBlockResponse loads the DiscardFinalizeBlockResponses for the given height from the +// LoadFinalizeBlockResponse loads the DiscardABCIResponses for the given height from the // database. If the node has D set to true, ErrABCIResponsesNotPersisted // is persisted. If not found, ErrNoABCIResponsesForHeight is returned. func (store dbStore) LoadFinalizeBlockResponse(height int64) (*abci.ResponseFinalizeBlock, error) { - if store.DiscardFinalizeBlockResponses { + if store.DiscardABCIResponses { return nil, ErrFinalizeBlockResponsesNotPersisted } @@ -445,7 +445,26 @@ func (store dbStore) LoadLastFinalizeBlockResponse(height int64) (*abci.Response // Here we validate the result by comparing its height to the expected height. if height != info.GetHeight() { - return nil, errors.New("expected height %d but last stored abci responses was at height %d") + return nil, fmt.Errorf("expected height %d but last stored abci responses was at height %d", height, info.GetHeight()) + } + + // It is possible if this is called directly after an upgrade that + // ResponseFinalizeBlock is nil. In which case we use the legacy + // ABCI responses + if info.ResponseFinalizeBlock == nil { + // sanity check + if info.LegacyAbciResponses == nil { + panic("state store contains last abci response but it is empty") + } + legacyResp := info.LegacyAbciResponses + return &abci.ResponseFinalizeBlock{ + TxResults: legacyResp.DeliverTxs, + ValidatorUpdates: legacyResp.EndBlock.ValidatorUpdates, + ConsensusParamUpdates: legacyResp.EndBlock.ConsensusParamUpdates, + Events: append(legacyResp.BeginBlock.Events, legacyResp.EndBlock.Events...), + // NOTE: AgreedAppData is missing in the response but will + // be caught and filled in consensus/replay.go + }, nil } return info.ResponseFinalizeBlock, nil @@ -469,7 +488,7 @@ func (store dbStore) SaveFinalizeBlockResponse(height int64, resp *abci.Response // If the flag is false then we save the ABCIResponse. This can be used for the /BlockResults // query or to reindex an event using the command line. - if !store.DiscardFinalizeBlockResponses { + if !store.DiscardABCIResponses { bz, err := resp.Marshal() if err != nil { return err diff --git a/state/store_test.go b/state/store_test.go index ec042886f..4d7847046 100644 --- a/state/store_test.go +++ b/state/store_test.go @@ -22,7 +22,7 @@ import ( func TestStoreLoadValidators(t *testing.T) { stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) val, _ := types.RandValidator(true, 10) vals := types.NewValidatorSet([]*types.Validator{val}) @@ -55,7 +55,7 @@ func BenchmarkLoadValidators(b *testing.B) { stateDB, err := dbm.NewDB("state", dbType, config.DBDir()) require.NoError(b, err) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) if err != nil { @@ -112,7 +112,7 @@ func TestPruneStates(t *testing.T) { t.Run(name, func(t *testing.T) { db := dbm.NewMemDB() stateStore := sm.NewStore(db, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) pk := ed25519.GenPrivKey().PubKey() @@ -214,11 +214,11 @@ func TestTxResultsHash(t *testing.T) { root := sm.TxResultsHash(txResults) - // root should be Merkle tree root of DeliverTxs responses + // root should be Merkle tree root of ExecTxResult responses results := types.NewResults(txResults) assert.Equal(t, root, results.Hash()) - // test we can prove first DeliverTx + // test we can prove first ExecTxResult proof := results.ProveResult(0) bz, err := results[0].Marshal() require.NoError(t, err) @@ -238,7 +238,7 @@ func TestLastFinalizeBlockResponses(t *testing.T) { t.Run("Not persisting responses", func(t *testing.T) { stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) responses, err := stateStore.LoadFinalizeBlockResponse(1) require.Error(t, err) @@ -251,7 +251,7 @@ func TestLastFinalizeBlockResponses(t *testing.T) { } // create new db and state store and set discard abciresponses to false. stateDB = dbm.NewMemDB() - stateStore = sm.NewStore(stateDB, sm.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore = sm.NewStore(stateDB, sm.StoreOptions{DiscardABCIResponses: false}) height := int64(10) // save the last abci response. err = stateStore.SaveFinalizeBlockResponse(height, response1) @@ -281,7 +281,7 @@ func TestLastFinalizeBlockResponses(t *testing.T) { } // create a new statestore with the responses on. stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: true, + DiscardABCIResponses: true, }) // save an additional response. err := stateStore.SaveFinalizeBlockResponse(height+1, response2) diff --git a/state/tx_filter_test.go b/state/tx_filter_test.go index 52cc1ab31..d5ab761ac 100644 --- a/state/tx_filter_test.go +++ b/state/tx_filter_test.go @@ -34,7 +34,7 @@ func TestTxFilter(t *testing.T) { stateDB, err := dbm.NewDB("state", "memdb", os.TempDir()) require.NoError(t, err) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) require.NoError(t, err) diff --git a/state/validation_test.go b/state/validation_test.go index fc6dd98a6..9c648bab4 100644 --- a/state/validation_test.go +++ b/state/validation_test.go @@ -32,7 +32,7 @@ func TestValidateBlockHeader(t *testing.T) { state, stateDB, privVals := makeState(3, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) mp := &mpmocks.Mempool{} mp.On("Lock").Return() @@ -127,7 +127,7 @@ func TestValidateBlockCommit(t *testing.T) { state, stateDB, privVals := makeState(1, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) mp := &mpmocks.Mempool{} mp.On("Lock").Return() @@ -263,7 +263,7 @@ func TestValidateBlockEvidence(t *testing.T) { state, stateDB, privVals := makeState(4, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) defaultEvidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) diff --git a/store/store_test.go b/store/store_test.go index 529259153..15ff3f45f 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -53,7 +53,7 @@ func makeStateAndBlockStore(t *testing.T) (sm.State, dbm.DB, *BlockStore) { blockDB := dbm.NewMemDB() stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) if err != nil { @@ -449,7 +449,7 @@ func TestLoadBaseMeta(t *testing.T) { config := test.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) @@ -516,7 +516,7 @@ func TestPruneBlocks(t *testing.T) { config := test.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) @@ -656,7 +656,7 @@ func TestLoadBlockMetaByHash(t *testing.T) { config := test.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) diff --git a/types/block.go b/types/block.go index b30ca829d..0b20d098f 100644 --- a/types/block.go +++ b/types/block.go @@ -342,7 +342,7 @@ type Header struct { ConsensusHash tmbytes.HexBytes `json:"consensus_hash"` // consensus params for current block AppHash tmbytes.HexBytes `json:"app_hash"` // state after txs from the previous block // root hash of all results from the txs from the previous block - // see `deterministicResponseDeliverTx` to understand which parts of a tx is hashed into here + // see `deterministicExecTxResult` to understand which parts of a tx is hashed into here LastResultsHash tmbytes.HexBytes `json:"last_results_hash"` // consensus info diff --git a/types/validation.go b/types/validation.go index 3b33e90db..3601f0479 100644 --- a/types/validation.go +++ b/types/validation.go @@ -19,7 +19,7 @@ func shouldBatchVerify(vals *ValidatorSet, commit *Commit) bool { // // It checks all the signatures! While it's safe to exit as soon as we have // 2/3+ signatures, doing so would impact incentivization logic in the ABCI -// application that depends on the LastCommitInfo sent in BeginBlock, which +// application that depends on the LastCommitInfo sent in FinalizeBlock, which // includes which validators signed. For instance, Gaia incentivizes proposers // with a bonus for including more than +2/3 of the signatures. func VerifyCommit(chainID string, vals *ValidatorSet, blockID BlockID,