From 001cd50fc7430851d57cf074866446df36febd13 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Thu, 17 Nov 2022 14:57:38 +0100 Subject: [PATCH 01/16] format generated protos --- abci/types/types.pb.go | 1 + proto/tendermint/crypto/keys.pb.go | 1 + proto/tendermint/mempool/types.pb.go | 1 + proto/tendermint/p2p/conn.pb.go | 1 + proto/tendermint/statesync/types.pb.go | 1 + 5 files changed, 5 insertions(+) diff --git a/abci/types/types.pb.go b/abci/types/types.pb.go index 5f1bf8d3b..da8ac16aa 100644 --- a/abci/types/types.pb.go +++ b/abci/types/types.pb.go @@ -1420,6 +1420,7 @@ func (m *RequestProcessProposal) GetProposerAddress() []byte { type Response struct { // Types that are valid to be assigned to Value: + // // *Response_Exception // *Response_Echo // *Response_Flush diff --git a/proto/tendermint/crypto/keys.pb.go b/proto/tendermint/crypto/keys.pb.go index bbd97d446..cfd176c6e 100644 --- a/proto/tendermint/crypto/keys.pb.go +++ b/proto/tendermint/crypto/keys.pb.go @@ -27,6 +27,7 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package // PublicKey defines the keys available for use with Tendermint Validators type PublicKey struct { // Types that are valid to be assigned to Sum: + // // *PublicKey_Ed25519 // *PublicKey_Secp256K1 Sum isPublicKey_Sum `protobuf_oneof:"sum"` diff --git a/proto/tendermint/mempool/types.pb.go b/proto/tendermint/mempool/types.pb.go index f7d16a64c..e63d67611 100644 --- a/proto/tendermint/mempool/types.pb.go +++ b/proto/tendermint/mempool/types.pb.go @@ -68,6 +68,7 @@ func (m *Txs) GetTxs() [][]byte { type Message struct { // Types that are valid to be assigned to Sum: + // // *Message_Txs Sum isMessage_Sum `protobuf_oneof:"sum"` } diff --git a/proto/tendermint/p2p/conn.pb.go b/proto/tendermint/p2p/conn.pb.go index dccdbe4a0..b2ace8eb8 100644 --- a/proto/tendermint/p2p/conn.pb.go +++ b/proto/tendermint/p2p/conn.pb.go @@ -158,6 +158,7 @@ func (m *PacketMsg) GetData() []byte { type Packet struct { // Types that are valid to be assigned to Sum: + // // *Packet_PacketPing // *Packet_PacketPong // *Packet_PacketMsg diff --git a/proto/tendermint/statesync/types.pb.go b/proto/tendermint/statesync/types.pb.go index 147e0f7d4..9ec4a1f36 100644 --- a/proto/tendermint/statesync/types.pb.go +++ b/proto/tendermint/statesync/types.pb.go @@ -24,6 +24,7 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type Message struct { // Types that are valid to be assigned to Sum: + // // *Message_SnapshotsRequest // *Message_SnapshotsResponse // *Message_ChunkRequest From 7c857be3b58fd539ca676a372ba96bf41d1516f0 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Mon, 21 Nov 2022 12:42:31 +0100 Subject: [PATCH 02/16] revert modifications to flush logic --- abci/client/grpc_client.go | 7 +- abci/client/socket_client.go | 129 ++++++++++++++++++------------ abci/client/socket_client_test.go | 47 ++++++++++- abci/server/socket_server.go | 12 +-- 4 files changed, 132 insertions(+), 63 deletions(-) diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 05eaf4ac5..26fbf6337 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -125,6 +125,7 @@ func (cli *grpcClient) OnStop() { if cli.conn != nil { cli.conn.Close() } + close(cli.chReqRes) } func (cli *grpcClient) StopForError(err error) { @@ -147,7 +148,6 @@ func (cli *grpcClient) StopForError(err error) { func (cli *grpcClient) Error() error { cli.mtx.Lock() defer cli.mtx.Unlock() - return cli.err } @@ -181,7 +181,10 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) //---------------------------------------- -func (cli *grpcClient) Flush(ctx context.Context) error { return nil } +func (cli *grpcClient) Flush(ctx context.Context) error { + _, err := cli.client.Flush(ctx, types.ToRequestFlush().GetFlush(), grpc.WaitForReady(true)) + return err +} func (cli *grpcClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { return cli.client.Echo(ctx, types.ToRequestEcho(msg).GetEcho(), grpc.WaitForReady(true)) diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index de89c3ba9..aee9c64ac 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -14,10 +14,12 @@ import ( "github.com/tendermint/tendermint/abci/types" tmnet "github.com/tendermint/tendermint/libs/net" "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/libs/timer" ) const ( - reqQueueSize = 256 // TODO make configurable + reqQueueSize = 256 // TODO make configurable + flushThrottleMS = 20 // Don't wait longer than... ) // socketClient is the client side implementation of the Tendermint @@ -26,8 +28,6 @@ const ( // // This is goroutine-safe. All calls are serialized to the server through an unbuffered queue. The socketClient // tracks responses and expects them to respect the order of the requests sent. -// -// The buffer is flushed after every message sent. type socketClient struct { service.BaseService @@ -35,7 +35,8 @@ type socketClient struct { mustConnect bool conn net.Conn - reqQueue chan *ReqRes + reqQueue chan *ReqRes + flushTimer *timer.ThrottleTimer mtx sync.Mutex err error @@ -51,10 +52,12 @@ var _ Client = (*socketClient)(nil) func NewSocketClient(addr string, mustConnect bool) Client { cli := &socketClient{ reqQueue: make(chan *ReqRes, reqQueueSize), + flushTimer: timer.NewThrottleTimer("socketClient", flushThrottleMS), mustConnect: mustConnect, - addr: addr, - reqSent: list.New(), - resCb: nil, + + addr: addr, + reqSent: list.New(), + resCb: nil, } cli.BaseService = *service.NewBaseService(nil, "socketClient", cli) return cli @@ -95,6 +98,7 @@ func (cli *socketClient) OnStop() { } cli.flushQueue() + cli.flushTimer.Stop() } // Error returns an error if the client was stopped abruptly. @@ -123,26 +127,37 @@ func (cli *socketClient) CheckTxAsync(ctx context.Context, req *types.RequestChe //---------------------------------------- func (cli *socketClient) sendRequestsRoutine(conn io.Writer) { - bw := bufio.NewWriter(conn) + w := bufio.NewWriter(conn) for { select { - case <-cli.Quit(): - return case reqres := <-cli.reqQueue: // N.B. We must enqueue before sending out the request, otherwise the // server may reply before we do it, and the receiver will fail for an // unsolicited reply. cli.trackRequest(reqres) - if err := types.WriteMessage(reqres.Request, bw); err != nil { + err := types.WriteMessage(reqres.Request, w) + if err != nil { cli.stopForError(fmt.Errorf("write to buffer: %w", err)) return } - if err := bw.Flush(); err != nil { - cli.stopForError(fmt.Errorf("flush buffer: %w", err)) - return + // If it's a flush request, flush the current buffer. + if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { + err = w.Flush() + if err != nil { + cli.stopForError(fmt.Errorf("flush buffer: %w", err)) + return + } } + case <-cli.flushTimer.Ch: // flush queue + select { + case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): + default: + // Probably will fill the buffer, or retry later. + } + case <-cli.Quit(): + return } } } @@ -155,8 +170,8 @@ func (cli *socketClient) recvResponseRoutine(conn io.Reader) { } var res = &types.Response{} - - if err := types.ReadMessage(r, res); err != nil { + err := types.ReadMessage(r, res) + if err != nil { cli.stopForError(fmt.Errorf("read message: %w", err)) return } @@ -221,43 +236,6 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { return nil } -func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) { - reqres := NewReqRes(req) - - // TODO: set cli.err if reqQueue times out - select { - case cli.reqQueue <- reqres: - case <-ctx.Done(): - return nil, ctx.Err() - } - - return reqres, nil -} - -// flushQueue marks as complete and discards all remaining pending requests -// from the queue. -func (cli *socketClient) flushQueue() { - cli.mtx.Lock() - defer cli.mtx.Unlock() - - // mark all in-flight messages as resolved (they will get cli.Error()) - for req := cli.reqSent.Front(); req != nil; req = req.Next() { - reqres := req.Value.(*ReqRes) - reqres.Done() - } - - // mark all queued messages as resolved -LOOP: - for { - select { - case reqres := <-cli.reqQueue: - reqres.Done() - default: - break LOOP - } - } -} - //---------------------------------------- func (cli *socketClient) Flush(ctx context.Context) error { @@ -385,6 +363,51 @@ func (cli *socketClient) FinalizeBlock(ctx context.Context, req *types.RequestFi return reqRes.Response.GetFinalizeBlock(), cli.Error() } +func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) { + reqres := NewReqRes(req) + + // TODO: set cli.err if reqQueue times out + select { + case cli.reqQueue <- reqres: + case <-ctx.Done(): + return nil, ctx.Err() + } + + // Maybe auto-flush, or unset auto-flush + switch req.Value.(type) { + case *types.Request_Flush: + cli.flushTimer.Unset() + default: + cli.flushTimer.Set() + } + + return reqres, nil +} + +// flushQueue marks as complete and discards all remaining pending requests +// from the queue. +func (cli *socketClient) flushQueue() { + cli.mtx.Lock() + defer cli.mtx.Unlock() + + // mark all in-flight messages as resolved (they will get cli.Error()) + for req := cli.reqSent.Front(); req != nil; req = req.Next() { + reqres := req.Value.(*ReqRes) + reqres.Done() + } + + // mark all queued messages as resolved +LOOP: + for { + select { + case reqres := <-cli.reqQueue: + reqres.Done() + default: + break LOOP + } + } +} + //---------------------------------------- func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index cbc0c5dd6..b881e82d5 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -33,7 +33,7 @@ func TestCalls(t *testing.T) { }() select { - case <-time.After(1 * time.Second): + case <-time.After(time.Second): require.Fail(t, "No response arrived") case err, ok := <-resp: require.True(t, ok, "Must not close channel") @@ -41,6 +41,38 @@ func TestCalls(t *testing.T) { } } +func TestHangingAsyncCalls(t *testing.T) { + app := slowApp{} + + s, c := setupClientServer(t, app) + + resp := make(chan error, 1) + go func() { + // Start BeginBlock and flush it + reqres, err := c.CheckTxAsync(context.Background(), &types.RequestCheckTx{}) + require.NoError(t, err) + // wait 20 ms for all events to travel socket, but + // no response yet from server + time.Sleep(50 * time.Millisecond) + // kill the server, so the connections break + err = s.Stop() + require.NoError(t, err) + + // wait for the response from BeginBlock + reqres.Wait() + fmt.Print(reqres) + resp <- c.Error() + }() + + select { + case <-time.After(time.Second): + require.Fail(t, "No response arrived") + case err, ok := <-resp: + require.True(t, ok, "Must not close channel") + assert.Error(t, err, "We should get EOF error") + } +} + func setupClientServer(t *testing.T, app types.Application) ( service.Service, abcicli.Client) { t.Helper() @@ -55,7 +87,7 @@ func setupClientServer(t *testing.T, app types.Application) ( t.Cleanup(func() { if err := s.Stop(); err != nil { - t.Error(err) + t.Log(err) } }) @@ -65,13 +97,22 @@ func setupClientServer(t *testing.T, app types.Application) ( t.Cleanup(func() { if err := c.Stop(); err != nil { - t.Error(err) + t.Log(err) } }) return s, c } +type slowApp struct { + types.BaseApplication +} + +func (slowApp) CheckTxAsync(_ context.Context, req types.RequestCheckTx) types.ResponseCheckTx { + time.Sleep(200 * time.Millisecond) + return types.ResponseCheckTx{} +} + // TestCallbackInvokedWhenSetLaet ensures that the callback is invoked when // set after the client completes the call into the app. Currently this // test relies on the callback being allowed to be invoked twice if set multiple diff --git a/abci/server/socket_server.go b/abci/server/socket_server.go index 6507ac661..1053e0d7f 100644 --- a/abci/server/socket_server.go +++ b/abci/server/socket_server.go @@ -183,6 +183,7 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, resp }() for { + var req = &types.Request{} err := types.ReadMessage(bufReader, req) if err != nil { @@ -303,11 +304,12 @@ func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, res closeConn <- fmt.Errorf("error writing message: %w", err) return } - - err = bufWriter.Flush() - if err != nil { - closeConn <- fmt.Errorf("error flushing write buffer: %w", err) - return + if _, ok := res.Value.(*types.Response_Flush); ok { + err = bufWriter.Flush() + if err != nil { + closeConn <- fmt.Errorf("error flushing write buffer: %w", err) + return + } } // If the application has responded with an exception, the server returns the error From 59276911e07a63b7aee5e4254f1e53a6e3f25c01 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 11:07:53 +0100 Subject: [PATCH 03/16] add grpc client test --- abci/client/grpc_client_test.go | 80 +++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 abci/client/grpc_client_test.go diff --git a/abci/client/grpc_client_test.go b/abci/client/grpc_client_test.go new file mode 100644 index 000000000..c1ed4f7da --- /dev/null +++ b/abci/client/grpc_client_test.go @@ -0,0 +1,80 @@ +package abcicli_test + +import ( + "fmt" + "math/rand" + "net" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "google.golang.org/grpc" + + "golang.org/x/net/context" + + "github.com/tendermint/tendermint/libs/log" + tmnet "github.com/tendermint/tendermint/libs/net" + + abciserver "github.com/tendermint/tendermint/abci/server" + "github.com/tendermint/tendermint/abci/types" +) + +func TestGRPC(t *testing.T) { + app := types.NewBaseApplication() + numCheckTxs := 2000 + socketFile := fmt.Sprintf("/tmp/test-%08x.sock", rand.Int31n(1<<30)) + defer os.Remove(socketFile) + socket := fmt.Sprintf("unix://%v", socketFile) + + // Start the listener + server := abciserver.NewGRPCServer(socket, app) + server.SetLogger(log.TestingLogger().With("module", "abci-server")) + err := server.Start() + require.NoError(t, err) + + t.Cleanup(func() { + if err := server.Stop(); err != nil { + t.Error(err) + } + }) + + // Connect to the socket + //nolint:staticcheck // SA1019 Existing use of deprecated but supported dial option. + conn, err := grpc.Dial(socket, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc)) + require.NoError(t, err) + + t.Cleanup(func() { + if err := conn.Close(); err != nil { + t.Error(err) + } + }) + + client := types.NewABCIClient(conn) + + // Write requests + for counter := 0; counter < numCheckTxs; counter++ { + // Send request + response, err := client.CheckTx(context.Background(), &types.RequestCheckTx{Tx: []byte("test")}) + require.NoError(t, err) + counter++ + if response.Code != 0 { + t.Error("CheckTx failed with ret_code", response.Code) + } + if counter > numCheckTxs { + t.Fatal("Too many CheckTx responses") + } + t.Log("response", counter) + if counter == numCheckTxs { + go func() { + time.Sleep(time.Second * 1) // Wait for a bit to allow counter overflow + }() + } + + } +} + +func dialerFunc(ctx context.Context, addr string) (net.Conn, error) { + return tmnet.Connect(addr) +} \ No newline at end of file From 77401582cc1839cf14a481a8171e0f88a0dc64cf Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 11:08:22 +0100 Subject: [PATCH 04/16] modify socket client to using flush after every sync call --- abci/client/socket_client.go | 55 +++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index aee9c64ac..061cfc0a5 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -239,10 +239,11 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { //---------------------------------------- func (cli *socketClient) Flush(ctx context.Context) error { - _, err := cli.queueRequest(ctx, types.ToRequestFlush()) + reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush()) if err != nil { return err } + reqRes.Wait() return nil } @@ -251,7 +252,9 @@ func (cli *socketClient) Echo(ctx context.Context, msg string) (*types.ResponseE if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetEcho(), cli.Error() } @@ -260,7 +263,9 @@ func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*typ if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetInfo(), cli.Error() } @@ -269,7 +274,9 @@ func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetCheckTx(), cli.Error() } @@ -278,7 +285,9 @@ func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*t if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetQuery(), cli.Error() } @@ -287,7 +296,9 @@ func (cli *socketClient) Commit(ctx context.Context, req *types.RequestCommit) ( if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetCommit(), cli.Error() } @@ -296,7 +307,9 @@ func (cli *socketClient) InitChain(ctx context.Context, req *types.RequestInitCh if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetInitChain(), cli.Error() } @@ -305,7 +318,9 @@ func (cli *socketClient) ListSnapshots(ctx context.Context, req *types.RequestLi if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetListSnapshots(), cli.Error() } @@ -314,7 +329,9 @@ func (cli *socketClient) OfferSnapshot(ctx context.Context, req *types.RequestOf if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetOfferSnapshot(), cli.Error() } @@ -323,7 +340,9 @@ func (cli *socketClient) LoadSnapshotChunk(ctx context.Context, req *types.Reque if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetLoadSnapshotChunk(), cli.Error() } @@ -332,7 +351,9 @@ func (cli *socketClient) ApplySnapshotChunk(ctx context.Context, req *types.Requ if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetApplySnapshotChunk(), cli.Error() } @@ -341,7 +362,9 @@ func (cli *socketClient) PrepareProposal(ctx context.Context, req *types.Request if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetPrepareProposal(), cli.Error() } @@ -350,7 +373,9 @@ func (cli *socketClient) ProcessProposal(ctx context.Context, req *types.Request if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetProcessProposal(), cli.Error() } @@ -359,7 +384,9 @@ func (cli *socketClient) FinalizeBlock(ctx context.Context, req *types.RequestFi if err != nil { return nil, err } - reqRes.Wait() + if err := cli.Flush(ctx); err != nil { + return nil, err + } return reqRes.Response.GetFinalizeBlock(), cli.Error() } From be8ea813d1d304a225164742e5b3bc20b5aa04b4 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 11:08:48 +0100 Subject: [PATCH 05/16] update abcicli --- abci/cmd/abci-cli/abci-cli.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/abci/cmd/abci-cli/abci-cli.go b/abci/cmd/abci-cli/abci-cli.go index d1d0e4f44..11ffe2016 100644 --- a/abci/cmd/abci-cli/abci-cli.go +++ b/abci/cmd/abci-cli/abci-cli.go @@ -167,10 +167,9 @@ where example.file looks something like: check_tx 0x00 check_tx 0xff - deliver_tx 0x00 + finalize_block 0x00 check_tx 0x00 - deliver_tx 0x01 - deliver_tx 0x04 + finalize_block 0x01 0x04 0xff info `, Args: cobra.ExactArgs(0), @@ -186,7 +185,7 @@ This command opens an interactive console for running any of the other commands without opening a new connection each time `, Args: cobra.ExactArgs(0), - ValidArgs: []string{"echo", "info", "deliver_tx", "check_tx", "prepare_proposal", "process_proposal", "commit", "query"}, + ValidArgs: []string{"echo", "info", "finalize_block", "check_tx", "prepare_proposal", "process_proposal", "commit", "query"}, RunE: cmdConsole, } From 77a02b2ee626e9320dca033778d2d4b3dba8d656 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 11:09:05 +0100 Subject: [PATCH 06/16] fix mempool test --- consensus/mempool_test.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 4b26e0d7a..ca40d65f3 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -2,7 +2,6 @@ package consensus import ( "context" - "encoding/binary" "fmt" "os" "testing" @@ -150,23 +149,25 @@ func TestMempoolRmBadTx(t *testing.T) { require.NoError(t, err) // increment the counter by 1 - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(0)) - - res, err := app.FinalizeBlock(context.Background(), &abci.RequestFinalizeBlock{Txs: [][]byte{kvstore.NewTx("key", "value")}}) + txBytes := kvstore.NewTx("key", "value") + res, err := app.FinalizeBlock(context.Background(), &abci.RequestFinalizeBlock{Txs: [][]byte{txBytes}}) require.NoError(t, err) assert.False(t, res.TxResults[0].IsErr()) assert.True(t, len(res.AgreedAppData) > 0) + _, err = app.Commit(context.Background(), &abci.RequestCommit{}) + require.NoError(t, err) + emptyMempoolCh := make(chan struct{}) checkTxRespCh := make(chan struct{}) go func() { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool - err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.ResponseCheckTx) { + invalidTx := []byte("invalidTx") + err := assertMempool(cs.txNotifier).CheckTx(invalidTx, func(r *abci.ResponseCheckTx) { if r.Code != kvstore.CodeTypeInvalidTxFormat { - t.Errorf("expected checktx to return bad nonce, got %v", r) + t.Errorf("expected checktx to return invalid format, got %v", r) return } checkTxRespCh <- struct{}{} @@ -178,7 +179,7 @@ func TestMempoolRmBadTx(t *testing.T) { // check for the tx for { - txs := assertMempool(cs.txNotifier).ReapMaxBytesMaxGas(int64(len(txBytes)), -1) + txs := assertMempool(cs.txNotifier).ReapMaxBytesMaxGas(int64(len(invalidTx)), -1) if len(txs) == 0 { emptyMempoolCh <- struct{}{} return From d807d51cd621f8aee0ca212c6a1ffe88dfd5b95c Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 11:21:32 +0100 Subject: [PATCH 07/16] add extra socket client test --- abci/client/socket_client_test.go | 50 +++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index b881e82d5..ceb2765d2 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -3,6 +3,8 @@ package abcicli_test import ( "context" "fmt" + "math/rand" + "os" "sync" "testing" "time" @@ -73,6 +75,54 @@ func TestHangingAsyncCalls(t *testing.T) { } } +func TestBulk(t *testing.T) { + const numTxs = 700000 + // use a socket instead of a port + socketFile := fmt.Sprintf("test-%08x.sock", rand.Int31n(1<<30)) + defer os.Remove(socketFile) + socket := fmt.Sprintf("unix://%v", socketFile) + app := types.NewBaseApplication() + // Start the listener + server := server.NewSocketServer(socket, app) + t.Cleanup(func() { + if err := server.Stop(); err != nil { + t.Log(err) + } + }) + err := server.Start() + require.NoError(t, err) + + // Connect to the socket + client := abcicli.NewSocketClient(socket, false) + + t.Cleanup(func() { + if err := client.Stop(); err != nil { + t.Log(err) + } + }) + + err = client.Start() + require.NoError(t, err) + + // Construct request + rfb := &types.RequestFinalizeBlock{Txs: make([][]byte, numTxs)} + for counter := 0; counter < numTxs; counter++ { + rfb.Txs[counter] = []byte("test") + } + // Send bulk request + res, err := client.FinalizeBlock(context.Background(), rfb) + require.NoError(t, err) + require.Equal(t, numTxs, len(res.TxResults), "Number of txs doesn't match") + for _, tx := range res.TxResults { + require.Equal(t, uint32(0), tx.Code, "Tx failed") + } + + // Send final flush message + err = client.Flush(context.Background()) + require.NoError(t, err) +} + + func setupClientServer(t *testing.T, app types.Application) ( service.Service, abcicli.Client) { t.Helper() From f6ceff03285616cb5f35c776bff2cc168032dbba Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 11:34:06 +0100 Subject: [PATCH 08/16] add TestFinalizeBlockCalled --- consensus/state_test.go | 72 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/consensus/state_test.go b/consensus/state_test.go index 211349598..648359845 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -1421,6 +1421,78 @@ func TestProcessProposalAccept(t *testing.T) { } } +func TestFinalizeBlockCalled(t *testing.T) { + for _, testCase := range []struct { + name string + voteNil bool + expectCalled bool + }{ + { + name: "finalize block called when block committed", + voteNil: false, + expectCalled: true, + }, + { + name: "not called when block not committed", + voteNil: true, + expectCalled: false, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + m := abcimocks.NewApplication(t) + m.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{ + Status: abci.ResponseProcessProposal_ACCEPT, + }, nil) + m.On("PrepareProposal", mock.Anything, mock.Anything).Return(&abci.ResponsePrepareProposal{}, nil) + r := &abci.ResponseFinalizeBlock{AgreedAppData: []byte("the_hash")} + m.On("FinalizeBlock", mock.Anything, mock.Anything).Return(r, nil).Maybe() + m.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil).Maybe() + + cs1, vss := randStateWithApp(4, m) + height, round := cs1.Height, cs1.Round + + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + pv1, err := cs1.privValidator.GetPubKey() + require.NoError(t, err) + addr := pv1.Address() + voteCh := subscribeToVoter(cs1, addr) + + startTestRound(cs1, cs1.Height, round) + ensureNewRound(newRoundCh, height, round) + ensureNewProposal(proposalCh, height, round) + rs := cs1.GetRoundState() + + blockID := types.BlockID{} + nextRound := round + 1 + nextHeight := height + if !testCase.voteNil { + nextRound = 0 + nextHeight = height + 1 + blockID = types.BlockID{ + Hash: rs.ProposalBlock.Hash(), + PartSetHeader: rs.ProposalBlockParts.Header(), + } + } + + signAddVotes(cs1, tmproto.PrevoteType, blockID.Hash, blockID.PartSetHeader, vss[1:]...) + ensurePrevoteMatch(t, voteCh, height, round, rs.ProposalBlock.Hash()) + + signAddVotes(cs1, tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader, vss[1:]...) + ensurePrecommit(voteCh, height, round) + + ensureNewRound(newRoundCh, nextHeight, nextRound) + m.AssertExpectations(t) + + if !testCase.expectCalled { + m.AssertNotCalled(t, "FinalizeBlock", context.TODO(), mock.Anything) + } else { + m.AssertCalled(t, "FinalizeBlock", context.TODO(), mock.Anything) + } + }) + } +} + // 4 vals, 3 Nil Precommits at P0 // What we want: // P0 waits for timeoutPrecommit before starting next round From 418975ecd6200e21496589aa61a67fd1dc505ead Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 12:11:47 +0100 Subject: [PATCH 09/16] remove flush call after recheck tx --- mempool/v0/clist_mempool.go | 6 +++--- mempool/v1/mempool.go | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index f1e323c79..9fb23c52a 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -664,9 +664,9 @@ func (mem *CListMempool) recheckTxs() { } } - if err := mem.proxyAppConn.Flush(context.TODO()); err != nil { - mem.logger.Error("recheckTx flush", err, "err") - } + // In Date: Tue, 22 Nov 2022 14:26:31 +0100 Subject: [PATCH 10/16] restore discard abci responses --- abci/example/kvstore/kvstore.go | 2 +- blocksync/reactor_test.go | 4 +- config/config.go | 6 +- config/toml.go | 2 +- consensus/byzantine_test.go | 2 +- consensus/common_test.go | 6 +- consensus/mempool_test.go | 4 +- consensus/reactor_test.go | 2 +- consensus/replay_file.go | 2 +- consensus/replay_test.go | 321 ++++++++++++++++++++++++++++---- consensus/wal_generator.go | 10 +- consensus/wal_test.go | 6 +- light/detector.go | 2 +- node/node.go | 2 +- node/node_test.go | 6 +- node/setup.go | 2 +- rpc/core/blocks_test.go | 2 +- state/execution_test.go | 22 +-- state/export_test.go | 2 +- state/helpers_test.go | 2 +- state/rollback_test.go | 6 +- state/state_test.go | 18 +- state/store.go | 12 +- state/store_test.go | 12 +- state/tx_filter_test.go | 2 +- state/validation_test.go | 6 +- store/store_test.go | 8 +- 27 files changed, 359 insertions(+), 112 deletions(-) diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index 43928ffa8..f80a4fd66 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -61,7 +61,7 @@ func NewPersistentApplication(dbDir string) *Application { name := "kvstore" db, err := dbm.NewGoLevelDB(name, dbDir) if err != nil { - panic(err) + panic(fmt.Errorf("failed to create persistent app at %s: %w", dbDir, err)) } return NewApplication(db) } diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index c845b066c..10c7105cf 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -74,7 +74,7 @@ func newReactor( blockDB := dbm.NewMemDB() stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) blockStore := store.NewBlockStore(blockDB) @@ -101,7 +101,7 @@ func newReactor( fastSync := true db := dbm.NewMemDB() stateStore = sm.NewStore(db, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mp, sm.EmptyEvidencePool{}, blockStore) diff --git a/config/config.go b/config/config.go index cc2491f84..c1ad2e229 100644 --- a/config/config.go +++ b/config/config.go @@ -1120,14 +1120,14 @@ type StorageConfig struct { // Set to false to ensure ABCI responses are persisted. ABCI responses are // required for `/block_results` RPC queries, and to reindex events in the // command-line tool. - DiscardFinalizeBlockResponses bool `mapstructure:"discard_abci_responses"` + DiscardABCIResponses bool `mapstructure:"discard_abci_responses"` } // DefaultStorageConfig returns the default configuration options relating to // Tendermint storage optimization. func DefaultStorageConfig() *StorageConfig { return &StorageConfig{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, } } @@ -1135,7 +1135,7 @@ func DefaultStorageConfig() *StorageConfig { // testing. func TestStorageConfig() *StorageConfig { return &StorageConfig{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, } } diff --git a/config/toml.go b/config/toml.go index a5550ff63..6041b92c3 100644 --- a/config/toml.go +++ b/config/toml.go @@ -514,7 +514,7 @@ peer_query_maj23_sleep_duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}" # considerable amount of disk space. Set to false to ensure ABCI responses are # persisted. ABCI responses are required for /block_results RPC queries, and to # reindex events in the command-line tool. -discard_abci_responses = {{ .Storage.DiscardFinalizeBlockResponses}} +discard_abci_responses = {{ .Storage.DiscardABCIResponses}} ####################################################### ### Transaction Indexer Configuration Options ### diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index d86ab3cba..a1cb43949 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -53,7 +53,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { logger := consensusLogger().With("test", "byzantine", "validator", i) stateDB := dbm.NewMemDB() // each state needs its own db stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc) thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) diff --git a/consensus/common_test.go b/consensus/common_test.go index e95df8e8c..2144bd556 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -434,7 +434,7 @@ func newStateWithConfigAndBlockStore( // Make State stateDB := blockDB stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) if err := stateStore.Save(state); err != nil { // for save height 1's validators info @@ -759,7 +759,7 @@ func randConsensusNet(t *testing.T, nValidators int, testName string, tickerFunc for i := 0; i < nValidators; i++ { stateDB := dbm.NewMemDB() // each state needs its own db stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc) thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) @@ -801,7 +801,7 @@ func randConsensusNetWithPeers( for i := 0; i < nPeers; i++ { stateDB := dbm.NewMemDB() // each state needs its own db stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) t.Cleanup(func() { _ = stateStore.Close() }) state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc) diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index ca40d65f3..a729e5b2d 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -118,7 +118,7 @@ func deliverTxsRange(t *testing.T, cs *State, start, end int) { func TestMempoolTxConcurrentWithCommit(t *testing.T) { state, privVals := randGenesisState(1, false, 10) blockDB := dbm.NewMemDB() - stateStore := sm.NewStore(blockDB, sm.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore := sm.NewStore(blockDB, sm.StoreOptions{DiscardABCIResponses: false}) cs := newStateWithConfigAndBlockStore(config, state, privVals[0], kvstore.NewInMemoryApplication(), blockDB) err := stateStore.Save(state) require.NoError(t, err) @@ -143,7 +143,7 @@ func TestMempoolRmBadTx(t *testing.T) { state, privVals := randGenesisState(1, false, 10) app := kvstore.NewInMemoryApplication() blockDB := dbm.NewMemDB() - stateStore := sm.NewStore(blockDB, sm.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore := sm.NewStore(blockDB, sm.StoreOptions{DiscardABCIResponses: false}) cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB) err := stateStore.Save(state) require.NoError(t, err) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 47befb3e9..1f52df754 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -139,7 +139,7 @@ func TestReactorWithEvidence(t *testing.T) { for i := 0; i < nValidators; i++ { stateDB := dbm.NewMemDB() // each state needs its own db stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc) thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 905f363a0..c342c32bd 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -298,7 +298,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo tmos.Exit(err.Error()) } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) gdoc, err := sm.MakeGenesisDocFromFile(config.GenesisFile()) if err != nil { diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 3ebff40fe..72c87e743 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "runtime" + "sort" "testing" "time" @@ -22,6 +23,7 @@ import ( "github.com/tendermint/tendermint/abci/types/mocks" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" + cryptoenc "github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/internal/test" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" @@ -163,7 +165,7 @@ LOOP: blockDB := dbm.NewMemDB() stateDB := blockDB stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile()) require.NoError(t, err) @@ -308,31 +310,272 @@ const numBlocks = 6 // 3 - save block and committed with truncated block store and state behind var modes = []uint{0, 1, 2, 3} +// This is actually not a test, it's for storing validator change tx data for testHandshakeReplay +func setupChainWithChangingValidators(t *testing.T, name string) (*cfg.Config, []*types.Block, []*types.Commit, sm.State) { + nPeers := 7 + nVals := 4 + css, genDoc, config, cleanup := randConsensusNetWithPeers( + t, + nVals, + nPeers, + name, + newMockTickerFunc(true), + newPersistentKVStoreWithPath) + genesisState, err := sm.MakeGenesisState(genDoc) + require.NoError(t, err) + t.Cleanup(cleanup) + + partSize := types.BlockPartSizeBytes + + newRoundCh := subscribe(css[0].eventBus, types.EventQueryNewRound) + proposalCh := subscribe(css[0].eventBus, types.EventQueryCompleteProposal) + + vss := make([]*validatorStub, nPeers) + for i := 0; i < nPeers; i++ { + vss[i] = newValidatorStub(css[i].privValidator, int32(i)) + } + height, round := css[0].Height, css[0].Round + + // start the machine + startTestRound(css[0], height, round) + incrementHeight(vss...) + ensureNewRound(newRoundCh, height, 0) + ensureNewProposal(proposalCh, height, round) + rs := css[0].GetRoundState() + signAddVotes(css[0], tmproto.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 2 + height++ + incrementHeight(vss...) + newValidatorPubKey1, err := css[nVals].privValidator.GetPubKey() + require.NoError(t, err) + valPubKey1ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey1) + require.NoError(t, err) + newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempool.TxInfo{}) + assert.NoError(t, err) + propBlock, err := css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + require.NoError(t, err) + propBlockParts, err := propBlock.MakePartSet(partSize) + require.NoError(t, err) + blockID := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + + proposal := types.NewProposal(vss[1].Height, round, -1, blockID) + p := proposal.ToProto() + if err := vss[1].SignProposal(test.DefaultTestChainID, p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + signAddVotes(css[0], tmproto.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 3 + height++ + incrementHeight(vss...) + updateValidatorPubKey1, err := css[nVals].privValidator.GetPubKey() + require.NoError(t, err) + updatePubKey1ABCI, err := cryptoenc.PubKeyToProto(updateValidatorPubKey1) + require.NoError(t, err) + updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) + err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempool.TxInfo{}) + assert.NoError(t, err) + propBlock, err = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + require.NoError(t, err) + propBlockParts, err = propBlock.MakePartSet(partSize) + require.NoError(t, err) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + + proposal = types.NewProposal(vss[2].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[2].SignProposal(test.DefaultTestChainID, p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + signAddVotes(css[0], tmproto.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 4 + height++ + incrementHeight(vss...) + newValidatorPubKey2, err := css[nVals+1].privValidator.GetPubKey() + require.NoError(t, err) + newVal2ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey2) + require.NoError(t, err) + newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempool.TxInfo{}) + assert.Nil(t, err) + newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey() + require.NoError(t, err) + newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3) + require.NoError(t, err) + newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempool.TxInfo{}) + assert.NoError(t, err) + propBlock, err = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + require.NoError(t, err) + propBlockParts, err = propBlock.MakePartSet(partSize) + require.NoError(t, err) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + newVss := make([]*validatorStub, nVals+1) + copy(newVss, vss[:nVals+1]) + sort.Sort(ValidatorStubsByPower(newVss)) + + valIndexFn := func(cssIdx int) int { + for i, vs := range newVss { + vsPubKey, err := vs.GetPubKey() + require.NoError(t, err) + + cssPubKey, err := css[cssIdx].privValidator.GetPubKey() + require.NoError(t, err) + + if vsPubKey.Equals(cssPubKey) { + return i + } + } + panic(fmt.Sprintf("validator css[%d] not found in newVss", cssIdx)) + } + + selfIndex := valIndexFn(0) + + proposal = types.NewProposal(vss[3].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[3].SignProposal(test.DefaultTestChainID, p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + + removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0) + err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil, mempool.TxInfo{}) + assert.Nil(t, err) + + rs = css[0].GetRoundState() + for i := 0; i < nVals+1; i++ { + if i == selfIndex { + continue + } + signAddVotes(css[0], tmproto.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i]) + } + + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 5 + height++ + incrementHeight(vss...) + // Reflect the changes to vss[nVals] at height 3 and resort newVss. + newVssIdx := valIndexFn(nVals) + newVss[newVssIdx].VotingPower = 25 + sort.Sort(ValidatorStubsByPower(newVss)) + selfIndex = valIndexFn(0) + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + for i := 0; i < nVals+1; i++ { + if i == selfIndex { + continue + } + signAddVotes(css[0], tmproto.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i]) + } + ensureNewRound(newRoundCh, height+1, 0) + + // HEIGHT 6 + height++ + incrementHeight(vss...) + removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) + err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempool.TxInfo{}) + assert.NoError(t, err) + propBlock, err = css[0].createProposalBlock() // changeProposer(t, cs1, vs2) + require.NoError(t, err) + propBlockParts, err = propBlock.MakePartSet(partSize) + require.NoError(t, err) + blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} + newVss = make([]*validatorStub, nVals+3) + copy(newVss, vss[:nVals+3]) + sort.Sort(ValidatorStubsByPower(newVss)) + + selfIndex = valIndexFn(0) + proposal = types.NewProposal(vss[1].Height, round, -1, blockID) + p = proposal.ToProto() + if err := vss[1].SignProposal(test.DefaultTestChainID, p); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + proposal.Signature = p.Signature + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + for i := 0; i < nVals+3; i++ { + if i == selfIndex { + continue + } + signAddVotes(css[0], tmproto.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i]) + } + ensureNewRound(newRoundCh, height+1, 0) + + chain := make([]*types.Block, 0) + commits := make([]*types.Commit, 0) + for i := 1; i <= numBlocks; i++ { + chain = append(chain, css[0].blockStore.LoadBlock(int64(i))) + commits = append(commits, css[0].blockStore.LoadBlockCommit(int64(i))) + } + return config, chain, commits, genesisState +} + // Sync from scratch func TestHandshakeReplayAll(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, 0, m) + t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) { + testHandshakeReplay(t, config, 0, m, false) + }) + t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) { + testHandshakeReplay(t, config, 0, m, false) + }) } } // Sync many, not from scratch func TestHandshakeReplaySome(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, 2, m) + testHandshakeReplay(t, config, 2, m, false) + testHandshakeReplay(t, config, 2, m, true) } } // Sync from lagging by one func TestHandshakeReplayOne(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, numBlocks-1, m) + testHandshakeReplay(t, config, numBlocks-1, m, false) + testHandshakeReplay(t, config, numBlocks-1, m, true) } } // Sync from caught up func TestHandshakeReplayNone(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, numBlocks, m) + testHandshakeReplay(t, config, numBlocks, m, false) + testHandshakeReplay(t, config, numBlocks, m, true) } } @@ -353,8 +596,9 @@ func tempWALWithData(data []byte) string { // Make some blocks. Start a fresh app and apply nBlocks blocks. // Then restart the app and sync it up with the remaining blocks -func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint) { +func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint, testValidatorsChange bool) { var ( + testConfig *cfg.Config chain []*types.Block commits []*types.Commit store *mockBlockStore @@ -364,35 +608,40 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin evpool = sm.EmptyEvidencePool{} ) - testConfig := ResetConfig(fmt.Sprintf("%d_%d_s", nBlocks, mode)) - t.Cleanup(func() { - _ = os.RemoveAll(testConfig.RootDir) - }) - walBody, err := WALWithNBlocks(t, numBlocks) - require.NoError(t, err) - walFile := tempWALWithData(walBody) - config.Consensus.SetWalFile(walFile) + if testValidatorsChange { + testConfig, chain, commits, genesisState = setupChainWithChangingValidators(t, fmt.Sprintf("%d_%d_m", nBlocks, mode)) + store = newMockBlockStore(t, config, genesisState.ConsensusParams) + } else { + testConfig = ResetConfig(fmt.Sprintf("%d_%d_s", nBlocks, mode)) + t.Cleanup(func() { + _ = os.RemoveAll(testConfig.RootDir) + }) + walBody, err := WALWithNBlocks(t, numBlocks, testConfig) + require.NoError(t, err) + walFile := tempWALWithData(walBody) + config.Consensus.SetWalFile(walFile) - privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()) + privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()) - wal, err := NewWAL(walFile) - require.NoError(t, err) - wal.SetLogger(log.TestingLogger()) - err = wal.Start() - require.NoError(t, err) - t.Cleanup(func() { - if err := wal.Stop(); err != nil { - t.Error(err) - } - }) - chain, commits, err = makeBlockchainFromWAL(wal) - require.NoError(t, err) - pubKey, err := privVal.GetPubKey() - require.NoError(t, err) - stateDB, genesisState, store = stateAndStore(t, config, pubKey, kvstore.AppVersion) + wal, err := NewWAL(walFile) + require.NoError(t, err) + wal.SetLogger(log.TestingLogger()) + err = wal.Start() + require.NoError(t, err) + t.Cleanup(func() { + if err := wal.Stop(); err != nil { + t.Error(err) + } + }) + chain, commits, err = makeBlockchainFromWAL(wal) + require.NoError(t, err) + pubKey, err := privVal.GetPubKey() + require.NoError(t, err) + stateDB, genesisState, store = stateAndStore(t, config, pubKey, kvstore.AppVersion) + } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) t.Cleanup(func() { _ = stateStore.Close() @@ -419,7 +668,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics()) stateDB1 := dbm.NewMemDB() stateStore := sm.NewStore(stateDB1, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) err := stateStore.Save(genesisState) require.NoError(t, err) @@ -449,7 +698,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin } }) - err = handshaker.Handshake(proxyApp) + err := handshaker.Handshake(proxyApp) if expectError { require.Error(t, err) return @@ -604,7 +853,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { require.NoError(t, err) stateDB, state, store := stateAndStore(t, config, pubKey, appVersion) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) state.LastValidators = state.Validators.Copy() @@ -821,7 +1070,7 @@ func stateAndStore( ) (dbm.DB, sm.State, *mockBlockStore) { stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := sm.MakeGenesisStateFromFile(config.GenesisFile()) require.NoError(t, err) @@ -917,7 +1166,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) { require.NoError(t, err) stateDB, state, store := stateAndStore(t, config, pubKey, 0x0) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) oldValAddr := state.Validators.Validators[0].Address diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index a84ddc0b6..4501bb42d 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -28,9 +28,7 @@ import ( // persistent kvstore application and special consensus wal instance // (byteBufferWAL) and waits until numBlocks are created. // If the node fails to produce given numBlocks, it returns an error. -func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { - config := getConfig(t) - +func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int, config *cfg.Config) (err error) { app := kvstore.NewPersistentApplication(filepath.Join(config.DBDir(), "wal_generator")) logger := log.TestingLogger().With("wal_generator", "wal_generator") @@ -49,7 +47,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { blockStoreDB := db.NewMemDB() stateDB := blockStoreDB stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := sm.MakeGenesisState(genDoc) if err != nil { @@ -123,11 +121,11 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { } // WALWithNBlocks returns a WAL content with numBlocks. -func WALWithNBlocks(t *testing.T, numBlocks int) (data []byte, err error) { +func WALWithNBlocks(t *testing.T, numBlocks int, config *cfg.Config) (data []byte, err error) { var b bytes.Buffer wr := bufio.NewWriter(&b) - if err := WALGenerateNBlocks(t, wr, numBlocks); err != nil { + if err := WALGenerateNBlocks(t, wr, numBlocks, config); err != nil { return []byte{}, err } diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 12b775b41..80367c000 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -56,7 +56,7 @@ func TestWALTruncate(t *testing.T) { // 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), // when headBuf is full, truncate content will Flush to the file. at this // time, RotateFile is called, truncate content exist in each file. - err = WALGenerateNBlocks(t, wal.Group(), 60) + err = WALGenerateNBlocks(t, wal.Group(), 60, getConfig(t)) require.NoError(t, err) time.Sleep(1 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run @@ -150,7 +150,7 @@ func TestWALWrite(t *testing.T) { } func TestWALSearchForEndHeight(t *testing.T) { - walBody, err := WALWithNBlocks(t, 6) + walBody, err := WALWithNBlocks(t, 6, getConfig(t)) if err != nil { t.Fatal(err) } @@ -188,7 +188,7 @@ func TestWALPeriodicSync(t *testing.T) { wal.SetLogger(log.TestingLogger()) // Generate some data - err = WALGenerateNBlocks(t, wal.Group(), 5) + err = WALGenerateNBlocks(t, wal.Group(), 5, getConfig(t)) require.NoError(t, err) // We should have data in the buffer now diff --git a/light/detector.go b/light/detector.go index 1fd21f41e..dd56d9428 100644 --- a/light/detector.go +++ b/light/detector.go @@ -34,7 +34,7 @@ func (c *Client) detectDivergence(ctx context.Context, primaryTrace []*types.Lig lastVerifiedHeader = primaryTrace[len(primaryTrace)-1].SignedHeader witnessesToRemove = make([]int, 0) ) - c.logger.Debug("Running detector against trace", "endBlockHeight", lastVerifiedHeader.Height, + c.logger.Debug("Running detector against trace", "finalizeBlockHeight", lastVerifiedHeader.Height, "endBlockHash", lastVerifiedHeader.Hash, "length", len(primaryTrace)) c.providerMutex.Lock() diff --git a/node/node.go b/node/node.go index 252715223..ddf86e0dc 100644 --- a/node/node.go +++ b/node/node.go @@ -152,7 +152,7 @@ func NewNode(config *cfg.Config, } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: config.Storage.DiscardFinalizeBlockResponses, + DiscardABCIResponses: config.Storage.DiscardABCIResponses, }) state, genDoc, err := LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider) diff --git a/node/node_test.go b/node/node_test.go index 9f389a9fd..436218d78 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -261,7 +261,7 @@ func TestCreateProposalBlock(t *testing.T) { var height int64 = 1 state, stateDB, privVals := state(1, height) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) maxBytes := 16384 var partSize uint32 = 256 @@ -373,7 +373,7 @@ func TestMaxProposalBlockSize(t *testing.T) { var height int64 = 1 state, stateDB, _ := state(1, height) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) var maxBytes int64 = 16384 var partSize uint32 = 256 @@ -505,7 +505,7 @@ func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) { // save validators to db for 2 heights stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) if err := stateStore.Save(s); err != nil { panic(err) diff --git a/node/setup.go b/node/setup.go index dc5498ba1..5fedcbe8e 100644 --- a/node/setup.go +++ b/node/setup.go @@ -620,7 +620,7 @@ func LoadStateFromDBOrGenesisDocProvider( } } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) if err != nil { diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index 1787a250b..8256920d9 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -79,7 +79,7 @@ func TestBlockResults(t *testing.T) { env = &Environment{} env.StateStore = sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) err := env.StateStore.SaveFinalizeBlockResponse(100, results) require.NoError(t, err) diff --git a/state/execution_test.go b/state/execution_test.go index 953d0a5de..78830fcc7 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -47,7 +47,7 @@ func TestApplyBlock(t *testing.T) { state, stateDB, _ := makeState(1, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) blockStore := store.NewBlockStore(dbm.NewMemDB()) @@ -88,7 +88,7 @@ func TestFinalizeBlockValidators(t *testing.T) { state, stateDB, _ := makeState(2, 2) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) prevHash := state.LastBlockID.Hash @@ -153,7 +153,7 @@ func TestFinalizeBlockMisbehavior(t *testing.T) { state, stateDB, privVals := makeState(1, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) defaultEvidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) @@ -270,7 +270,7 @@ func TestProcessProposal(t *testing.T) { state, stateDB, privVals := makeState(1, height) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) blockStore := store.NewBlockStore(dbm.NewMemDB()) eventBus := types.NewEventBus() @@ -478,7 +478,7 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) { state, stateDB, _ := makeState(1, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) mp := &mpmocks.Mempool{} mp.On("Lock").Return() @@ -567,7 +567,7 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { state, stateDB, _ := makeState(1, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor( @@ -608,7 +608,7 @@ func TestEmptyPrepareProposal(t *testing.T) { state, stateDB, privVals := makeState(1, height) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) mp := &mpmocks.Mempool{} mp.On("Lock").Return() @@ -646,7 +646,7 @@ func TestPrepareProposalTxsAllIncluded(t *testing.T) { state, stateDB, privVals := makeState(1, height) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) evpool := &mocks.EvidencePool{} @@ -695,7 +695,7 @@ func TestPrepareProposalReorderTxs(t *testing.T) { state, stateDB, privVals := makeState(1, height) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) evpool := &mocks.EvidencePool{} @@ -750,7 +750,7 @@ func TestPrepareProposalErrorOnTooManyTxs(t *testing.T) { // limit max block size state.ConsensusParams.Block.MaxBytes = 60 * 1024 stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) evpool := &mocks.EvidencePool{} @@ -801,7 +801,7 @@ func TestPrepareProposalErrorOnPrepareProposalError(t *testing.T) { state, stateDB, privVals := makeState(1, height) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) evpool := &mocks.EvidencePool{} diff --git a/state/export_test.go b/state/export_test.go index f1a60f29f..331cfb3b7 100644 --- a/state/export_test.go +++ b/state/export_test.go @@ -41,6 +41,6 @@ func ValidateValidatorUpdates(abciUpdates []abci.ValidatorUpdate, params types.V // SaveValidatorsInfo is an alias for the private saveValidatorsInfo method in // store.go, exported exclusively and explicitly for testing. func SaveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *types.ValidatorSet) error { - stateStore := dbStore{db, StoreOptions{DiscardFinalizeBlockResponses: false}} + stateStore := dbStore{db, StoreOptions{DiscardABCIResponses: false}} return stateStore.saveValidatorsInfo(height, lastHeightChanged, valSet) } diff --git a/state/helpers_test.go b/state/helpers_test.go index 0e9652d2c..f1d5c971e 100644 --- a/state/helpers_test.go +++ b/state/helpers_test.go @@ -124,7 +124,7 @@ func makeState(nVals, height int) (sm.State, dbm.DB, map[string]types.PrivValida stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) if err := stateStore.Save(s); err != nil { panic(err) diff --git a/state/rollback_test.go b/state/rollback_test.go index 08a7ea9c3..9e2d03efc 100644 --- a/state/rollback_test.go +++ b/state/rollback_test.go @@ -88,7 +88,7 @@ func TestRollback(t *testing.T) { func TestRollbackHard(t *testing.T) { const height int64 = 100 blockStore := store.NewBlockStore(dbm.NewMemDB()) - stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{DiscardABCIResponses: false}) valSet, _ := types.RandValidatorSet(5, 10) @@ -204,7 +204,7 @@ func TestRollbackHard(t *testing.T) { func TestRollbackNoState(t *testing.T) { stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) blockStore := &mocks.BlockStore{} @@ -238,7 +238,7 @@ func TestRollbackDifferentStateHeight(t *testing.T) { } func setupStateStore(t *testing.T, height int64) state.Store { - stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore := state.NewStore(dbm.NewMemDB(), state.StoreOptions{DiscardABCIResponses: false}) valSet, _ := types.RandValidatorSet(5, 10) params := types.DefaultConsensusParams() diff --git a/state/state_test.go b/state/state_test.go index dc14b6bbb..d8ddf26d1 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -28,7 +28,7 @@ func setupTestCase(t *testing.T) (func(t *testing.T), dbm.DB, sm.State) { dbType := dbm.BackendType(config.DBBackend) stateDB, err := dbm.NewDB("state", dbType, config.DBDir()) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) require.NoError(t, err) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) @@ -77,7 +77,7 @@ func TestStateSaveLoad(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) assert := assert.New(t) @@ -98,7 +98,7 @@ func TestFinalizeBlockResponsesSaveLoad1(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) assert := assert.New(t) @@ -131,7 +131,7 @@ func TestFinalizeBlockResponsesSaveLoad2(t *testing.T) { assert := assert.New(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) cases := [...]struct { @@ -219,7 +219,7 @@ func TestValidatorSimpleSaveLoad(t *testing.T) { assert := assert.New(t) statestore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) // Can't load anything for height 0. @@ -254,7 +254,7 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) // Change vals at these heights. @@ -912,7 +912,7 @@ func TestStoreLoadValidatorsIncrementsProposerPriority(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) t.Cleanup(func() { tearDown(t) }) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state.Validators = genValSet(valSetSize) state.NextValidators = state.Validators.CopyIncrementProposerPriority(1) @@ -939,7 +939,7 @@ func TestManyValidatorChangesSaveLoad(t *testing.T) { tearDown, stateDB, state := setupTestCase(t) defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) require.Equal(t, int64(0), state.LastBlockHeight) state.Validators = genValSet(valSetSize) @@ -1005,7 +1005,7 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) { defer tearDown(t) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) // Change vals at these heights. diff --git a/state/store.go b/state/store.go index bcb293d45..107658d82 100644 --- a/state/store.go +++ b/state/store.go @@ -84,11 +84,11 @@ type dbStore struct { } type StoreOptions struct { - // DiscardFinalizeBlockResponses determines whether or not the store - // retains all ABCIResponses. If DiscardFinalizeBlockResponses is enabled, + // DiscardABCIResponses determines whether or not the store + // retains all ABCIResponses. If DiscardABCIResponses is enabled, // the store will maintain only the response object from the latest // height. - DiscardFinalizeBlockResponses bool + DiscardABCIResponses bool } var _ Store = (*dbStore)(nil) @@ -375,11 +375,11 @@ func TxResultsHash(txResults []*abci.ExecTxResult) []byte { return types.NewResults(txResults).Hash() } -// LoadFinalizeBlockResponse loads the DiscardFinalizeBlockResponses for the given height from the +// LoadFinalizeBlockResponse loads the DiscardABCIResponses for the given height from the // database. If the node has D set to true, ErrABCIResponsesNotPersisted // is persisted. If not found, ErrNoABCIResponsesForHeight is returned. func (store dbStore) LoadFinalizeBlockResponse(height int64) (*abci.ResponseFinalizeBlock, error) { - if store.DiscardFinalizeBlockResponses { + if store.DiscardABCIResponses { return nil, ErrFinalizeBlockResponsesNotPersisted } @@ -469,7 +469,7 @@ func (store dbStore) SaveFinalizeBlockResponse(height int64, resp *abci.Response // If the flag is false then we save the ABCIResponse. This can be used for the /BlockResults // query or to reindex an event using the command line. - if !store.DiscardFinalizeBlockResponses { + if !store.DiscardABCIResponses { bz, err := resp.Marshal() if err != nil { return err diff --git a/state/store_test.go b/state/store_test.go index ec042886f..6d4715c96 100644 --- a/state/store_test.go +++ b/state/store_test.go @@ -22,7 +22,7 @@ import ( func TestStoreLoadValidators(t *testing.T) { stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) val, _ := types.RandValidator(true, 10) vals := types.NewValidatorSet([]*types.Validator{val}) @@ -55,7 +55,7 @@ func BenchmarkLoadValidators(b *testing.B) { stateDB, err := dbm.NewDB("state", dbType, config.DBDir()) require.NoError(b, err) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) if err != nil { @@ -112,7 +112,7 @@ func TestPruneStates(t *testing.T) { t.Run(name, func(t *testing.T) { db := dbm.NewMemDB() stateStore := sm.NewStore(db, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) pk := ed25519.GenPrivKey().PubKey() @@ -238,7 +238,7 @@ func TestLastFinalizeBlockResponses(t *testing.T) { t.Run("Not persisting responses", func(t *testing.T) { stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) responses, err := stateStore.LoadFinalizeBlockResponse(1) require.Error(t, err) @@ -251,7 +251,7 @@ func TestLastFinalizeBlockResponses(t *testing.T) { } // create new db and state store and set discard abciresponses to false. stateDB = dbm.NewMemDB() - stateStore = sm.NewStore(stateDB, sm.StoreOptions{DiscardFinalizeBlockResponses: false}) + stateStore = sm.NewStore(stateDB, sm.StoreOptions{DiscardABCIResponses: false}) height := int64(10) // save the last abci response. err = stateStore.SaveFinalizeBlockResponse(height, response1) @@ -281,7 +281,7 @@ func TestLastFinalizeBlockResponses(t *testing.T) { } // create a new statestore with the responses on. stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: true, + DiscardABCIResponses: true, }) // save an additional response. err := stateStore.SaveFinalizeBlockResponse(height+1, response2) diff --git a/state/tx_filter_test.go b/state/tx_filter_test.go index 52cc1ab31..d5ab761ac 100644 --- a/state/tx_filter_test.go +++ b/state/tx_filter_test.go @@ -34,7 +34,7 @@ func TestTxFilter(t *testing.T) { stateDB, err := dbm.NewDB("state", "memdb", os.TempDir()) require.NoError(t, err) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) require.NoError(t, err) diff --git a/state/validation_test.go b/state/validation_test.go index 7fc092bfa..f99dfbd83 100644 --- a/state/validation_test.go +++ b/state/validation_test.go @@ -32,7 +32,7 @@ func TestValidateBlockHeader(t *testing.T) { state, stateDB, privVals := makeState(3, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) mp := &mpmocks.Mempool{} mp.On("Lock").Return() @@ -120,7 +120,7 @@ func TestValidateBlockCommit(t *testing.T) { state, stateDB, privVals := makeState(1, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) mp := &mpmocks.Mempool{} mp.On("Lock").Return() @@ -254,7 +254,7 @@ func TestValidateBlockEvidence(t *testing.T) { state, stateDB, privVals := makeState(4, 1) stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) defaultEvidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) diff --git a/store/store_test.go b/store/store_test.go index 47c944956..93c9227d6 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -50,7 +50,7 @@ func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore, cleanupFu blockDB := dbm.NewMemDB() stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) if err != nil { @@ -367,7 +367,7 @@ func TestLoadBaseMeta(t *testing.T) { config := test.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) @@ -429,7 +429,7 @@ func TestPruneBlocks(t *testing.T) { config := test.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) @@ -569,7 +569,7 @@ func TestLoadBlockMetaByHash(t *testing.T) { config := test.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) stateStore := sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) From 578322a6fb1a2a84045c40c20b7e071af304fbf6 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 15:08:26 +0100 Subject: [PATCH 11/16] clean up usage of begin block, deliver tx and end block --- abci/client/grpc_client_test.go | 2 +- abci/client/socket_client_test.go | 7 +++---- cmd/tendermint/commands/reindex_event.go | 2 +- cmd/tendermint/commands/rollback.go | 2 +- evidence/pool_test.go | 2 +- mempool/v0/reactor_test.go | 6 +++--- mempool/v1/mempool.go | 2 +- node/node.go | 2 +- rpc/client/interface.go | 6 +++--- rpc/core/blocks.go | 4 ++-- rpc/core/mempool.go | 18 +++++++++--------- rpc/core/types/responses.go | 2 +- state/indexer/block.go | 6 +++--- state/indexer/block/kv/kv.go | 15 +++++++-------- state/indexer/sink/psql/backport.go | 3 +-- state/indexer/sink/psql/psql.go | 2 +- state/indexer/sink/psql/psql_test.go | 13 ++----------- state/metrics.gen.go | 2 +- state/metrics.go | 2 +- state/state.go | 2 +- state/store_test.go | 4 ++-- types/block.go | 2 +- types/validation.go | 2 +- 23 files changed, 48 insertions(+), 60 deletions(-) diff --git a/abci/client/grpc_client_test.go b/abci/client/grpc_client_test.go index c1ed4f7da..7162ad7bb 100644 --- a/abci/client/grpc_client_test.go +++ b/abci/client/grpc_client_test.go @@ -77,4 +77,4 @@ func TestGRPC(t *testing.T) { func dialerFunc(ctx context.Context, addr string) (net.Conn, error) { return tmnet.Connect(addr) -} \ No newline at end of file +} diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index ceb2765d2..2995c431c 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -50,7 +50,7 @@ func TestHangingAsyncCalls(t *testing.T) { resp := make(chan error, 1) go func() { - // Start BeginBlock and flush it + // Call CheckTx reqres, err := c.CheckTxAsync(context.Background(), &types.RequestCheckTx{}) require.NoError(t, err) // wait 20 ms for all events to travel socket, but @@ -60,7 +60,7 @@ func TestHangingAsyncCalls(t *testing.T) { err = s.Stop() require.NoError(t, err) - // wait for the response from BeginBlock + // wait for the response from CheckTx reqres.Wait() fmt.Print(reqres) resp <- c.Error() @@ -94,7 +94,7 @@ func TestBulk(t *testing.T) { // Connect to the socket client := abcicli.NewSocketClient(socket, false) - + t.Cleanup(func() { if err := client.Stop(); err != nil { t.Log(err) @@ -122,7 +122,6 @@ func TestBulk(t *testing.T) { require.NoError(t, err) } - func setupClientServer(t *testing.T, app types.Application) ( service.Service, abcicli.Client) { t.Helper() diff --git a/cmd/tendermint/commands/reindex_event.go b/cmd/tendermint/commands/reindex_event.go index 65070ef60..f5a9acb4d 100644 --- a/cmd/tendermint/commands/reindex_event.go +++ b/cmd/tendermint/commands/reindex_event.go @@ -41,7 +41,7 @@ reindex from the base block height(inclusive); and the default end-height is 0, the tooling will reindex until the latest block height(inclusive). User can omit either or both arguments. -Note: This operation requires ABCI Responses. Do not set DiscardFinalizeBlockResponses to true if you +Note: This operation requires ABCI Responses. Do not set DiscardABCIResponses to true if you want to use this command. `, Example: ` diff --git a/cmd/tendermint/commands/rollback.go b/cmd/tendermint/commands/rollback.go index 8a60e96ac..c232c0b8d 100644 --- a/cmd/tendermint/commands/rollback.go +++ b/cmd/tendermint/commands/rollback.go @@ -90,7 +90,7 @@ func loadStateAndBlockStore(config *cfg.Config) (*store.BlockStore, state.Store, return nil, nil, err } stateStore := state.NewStore(stateDB, state.StoreOptions{ - DiscardFinalizeBlockResponses: config.Storage.DiscardFinalizeBlockResponses, + DiscardABCIResponses: config.Storage.DiscardABCIResponses, }) return blockStore, stateStore, nil diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 3b5c0626e..8283bb2f9 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -359,7 +359,7 @@ func TestRecoverPendingEvidence(t *testing.T) { func initializeStateFromValidatorSet(valSet *types.ValidatorSet, height int64) sm.Store { stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardFinalizeBlockResponses: false, + DiscardABCIResponses: false, }) state := sm.State{ ChainID: evidenceChainID, diff --git a/mempool/v0/reactor_test.go b/mempool/v0/reactor_test.go index 3e4ef9074..b5022e260 100644 --- a/mempool/v0/reactor_test.go +++ b/mempool/v0/reactor_test.go @@ -98,11 +98,11 @@ func TestReactorConcurrency(t *testing.T) { reactors[0].mempool.Lock() defer reactors[0].mempool.Unlock() - deliverTxResponses := make([]*abci.ExecTxResult, len(txs)) + txResponses := make([]*abci.ExecTxResult, len(txs)) for i := range txs { - deliverTxResponses[i] = &abci.ExecTxResult{Code: 0} + txResponses[i] = &abci.ExecTxResult{Code: 0} } - err := reactors[0].mempool.Update(1, txs, deliverTxResponses, nil, nil) + err := reactors[0].mempool.Update(1, txs, txResponses, nil, nil) assert.NoError(t, err) }() diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index cd0a6cfd7..6f7b19e83 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -387,7 +387,7 @@ func (txmp *TxMempool) Update( ) error { // Safety check: Transactions and responses must match in number. if len(blockTxs) != len(txResults) { - panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses", + panic(fmt.Sprintf("mempool: got %d transactions but %d TxResult responses", len(blockTxs), len(txResults))) } diff --git a/node/node.go b/node/node.go index ddf86e0dc..2342257cf 100644 --- a/node/node.go +++ b/node/node.go @@ -171,7 +171,7 @@ func NewNode(config *cfg.Config, // EventBus and IndexerService must be started before the handshake because // we might need to index the txs of the replayed block as this might not have happened // when the node stopped last time (i.e. the node stopped after it saved the block - // but before it indexed the txs, or, endblocker panicked) + // but before it indexed the txs) eventBus, err := createAndStartEventBus(logger) if err != nil { return nil, err diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 92783634c..bc3df8f15 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -74,7 +74,7 @@ type SignClient interface { Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) // TxSearch defines a method to search for a paginated set of transactions by - // DeliverTx event search criteria. + // transaction event search criteria. TxSearch( ctx context.Context, query string, @@ -83,8 +83,8 @@ type SignClient interface { orderBy string, ) (*ctypes.ResultTxSearch, error) - // BlockSearch defines a method to search for a paginated set of blocks by - // BeginBlock and EndBlock event search criteria. + // BlockSearch defines a method to search for a paginated set of blocks based + // from FinalizeBlock event search criteria. BlockSearch( ctx context.Context, query string, diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index c3f56d677..08568f5fb 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -191,8 +191,8 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR }, nil } -// BlockSearch searches for a paginated set of blocks matching BeginBlock and -// EndBlock event search criteria. +// BlockSearch searches for a paginated set of blocks matching +// FinalizeBlock event search criteria. func BlockSearch( ctx *rpctypes.Context, query string, diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 24e86c1d6..8c6683ff1 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -17,7 +17,7 @@ import ( // NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!) // BroadcastTxAsync returns right away, with no response. Does not wait for -// CheckTx nor DeliverTx results. +// CheckTx nor transcation results. // More: https://docs.tendermint.com/main/rpc/#/Tx/broadcast_tx_async func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{}) @@ -29,7 +29,7 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca } // BroadcastTxSync returns with the response from CheckTx. Does not wait for -// DeliverTx result. +// the transaction result. // More: https://docs.tendermint.com/main/rpc/#/Tx/broadcast_tx_sync func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.ResponseCheckTx, 1) @@ -58,7 +58,7 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas } } -// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. +// BroadcastTxCommit returns with the responses from CheckTx and ExecTxResult. // More: https://docs.tendermint.com/main/rpc/#/Tx/broadcast_tx_commit func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { subscriber := ctx.RemoteAddr() @@ -73,7 +73,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) defer cancel() q := types.EventQueryTxFor(tx) - deliverTxSub, err := env.EventBus.Subscribe(subCtx, subscriber, q) + txSub, err := env.EventBus.Subscribe(subCtx, subscriber, q) if err != nil { err = fmt.Errorf("failed to subscribe to tx: %w", err) env.Logger.Error("Error on broadcast_tx_commit", "err", err) @@ -111,7 +111,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc // Wait for the tx to be included in a block or timeout. select { - case msg := <-deliverTxSub.Out(): // The tx was included in a block. + case msg := <-txSub.Out(): // The tx was included in a block. txResultEvent := msg.Data().(types.EventDataTx) return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, @@ -119,14 +119,14 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc Hash: tx.Hash(), Height: txResultEvent.Height, }, nil - case <-deliverTxSub.Cancelled(): + case <-txSub.Cancelled(): var reason string - if deliverTxSub.Err() == nil { + if txSub.Err() == nil { reason = "Tendermint exited" } else { - reason = deliverTxSub.Err().Error() + reason = txSub.Err().Error() } - err = fmt.Errorf("deliverTxSub was canceled (reason: %s)", reason) + err = fmt.Errorf("txSub was canceled (reason: %s)", reason) env.Logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 9bc7a9601..e63271bd0 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -181,7 +181,7 @@ type ResultBroadcastTx struct { Hash bytes.HexBytes `json:"hash"` } -// CheckTx and DeliverTx results +// CheckTx and ExecTx results type ResultBroadcastTxCommit struct { CheckTx abci.ResponseCheckTx `json:"check_tx"` TxResult abci.ExecTxResult `json:"tx_result"` diff --git a/state/indexer/block.go b/state/indexer/block.go index 365b3ae08..96328dca0 100644 --- a/state/indexer/block.go +++ b/state/indexer/block.go @@ -15,10 +15,10 @@ type BlockIndexer interface { // upon database query failure. Has(height int64) (bool, error) - // Index indexes BeginBlock and EndBlock events for a given block by its height. + // Index indexes FinalizeBlock events for a given block by its height. Index(types.EventDataNewBlockEvents) error - // Search performs a query for block heights that match a given BeginBlock - // and Endblock event search criteria. + // Search performs a query for block heights that match a given FinalizeBlock + // event search criteria. Search(ctx context.Context, q *query.Query) ([]int64, error) } diff --git a/state/indexer/block/kv/kv.go b/state/indexer/block/kv/kv.go index 1a33ab2fe..b31684119 100644 --- a/state/indexer/block/kv/kv.go +++ b/state/indexer/block/kv/kv.go @@ -20,7 +20,7 @@ import ( var _ indexer.BlockIndexer = (*BlockerIndexer)(nil) -// BlockerIndexer implements a block indexer, indexing BeginBlock and EndBlock +// BlockerIndexer implements a block indexer, indexing FinalizeBlock // events with an underlying KV store. Block events are indexed by their height, // such that matching search criteria returns the respective block height(s). type BlockerIndexer struct { @@ -44,12 +44,11 @@ func (idx *BlockerIndexer) Has(height int64) (bool, error) { return idx.store.Has(key) } -// Index indexes BeginBlock and EndBlock events for a given block by its height. +// Index indexes FinalizeBlock events for a given block by its height. // The following is indexed: // // primary key: encode(block.height | height) => encode(height) -// BeginBlock events: encode(eventType.eventAttr|eventValue|height|begin_block) => encode(height) -// EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height) +// FinalizeBlock events: encode(eventType.eventAttr|eventValue|height|finalize_block) => encode(height) func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockEvents) error { batch := idx.store.NewBatch() defer batch.Close() @@ -66,15 +65,15 @@ func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockEvents) error { } // 2. index block events - if err := idx.indexEvents(batch, bh.Events, "begin_block", height); err != nil { - return fmt.Errorf("failed to index BeginBlock events: %w", err) + if err := idx.indexEvents(batch, bh.Events, "finalize_block", height); err != nil { + return fmt.Errorf("failed to index FinalizeBlock events: %w", err) } return batch.WriteSync() } -// Search performs a query for block heights that match a given BeginBlock -// and Endblock event search criteria. The given query can match against zero, +// Search performs a query for block heights that match a given FinalizeBlock +// event search criteria. The given query can match against zero, // one or more block heights. In the case of height queries, i.e. block.height=H, // if the height is indexed, that height alone will be returned. An error and // nil slice is returned. Otherwise, a non-nil slice and nil error is returned. diff --git a/state/indexer/sink/psql/backport.go b/state/indexer/sink/psql/backport.go index ce4a137db..68efa2aae 100644 --- a/state/indexer/sink/psql/backport.go +++ b/state/indexer/sink/psql/backport.go @@ -24,8 +24,7 @@ import ( ) const ( - eventTypeBeginBlock = "begin_block" - eventTypeEndBlock = "end_block" + eventTypeFinalizeBlock = "finaliz_block" ) // TxIndexer returns a bridge from es to the Tendermint v0.34 transaction indexer. diff --git a/state/indexer/sink/psql/psql.go b/state/indexer/sink/psql/psql.go index 0c71541e5..a2417c8ac 100644 --- a/state/indexer/sink/psql/psql.go +++ b/state/indexer/sink/psql/psql.go @@ -165,7 +165,7 @@ INSERT INTO `+tableBlocks+` (height, chain_id, created_at) } // Insert all the block events. Order is important here, if err := insertEvents(dbtx, blockID, 0, h.Events); err != nil { - return fmt.Errorf("begin-block events: %w", err) + return fmt.Errorf("finalizeblock events: %w", err) } return nil }) diff --git a/state/indexer/sink/psql/psql_test.go b/state/indexer/sink/psql/psql_test.go index 2ee777611..b08c2dbf3 100644 --- a/state/indexer/sink/psql/psql_test.go +++ b/state/indexer/sink/psql/psql_test.go @@ -351,17 +351,8 @@ SELECT height FROM `+tableBlocks+` WHERE height = $1; if err := testDB().QueryRow(` SELECT type, height, chain_id FROM `+viewBlockEvents+` WHERE height = $1 AND type = $2 AND chain_id = $3; -`, height, eventTypeBeginBlock, chainID).Err(); err == sql.ErrNoRows { - t.Errorf("No %q event found for height=%d", eventTypeBeginBlock, height) - } else if err != nil { - t.Fatalf("Database query failed: %v", err) - } - - if err := testDB().QueryRow(` -SELECT type, height, chain_id FROM `+viewBlockEvents+` - WHERE height = $1 AND type = $2 AND chain_id = $3; -`, height, eventTypeEndBlock, chainID).Err(); err == sql.ErrNoRows { - t.Errorf("No %q event found for height=%d", eventTypeEndBlock, height) +`, height, eventTypeFinalizeBlock, chainID).Err(); err == sql.ErrNoRows { + t.Errorf("No %q event found for height=%d", eventTypeFinalizeBlock, height) } else if err != nil { t.Fatalf("Database query failed: %v", err) } diff --git a/state/metrics.gen.go b/state/metrics.gen.go index 1ce2c4de1..512bb7a37 100644 --- a/state/metrics.gen.go +++ b/state/metrics.gen.go @@ -18,7 +18,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Namespace: namespace, Subsystem: MetricsSubsystem, Name: "block_processing_time", - Help: "Time between BeginBlock and EndBlock in ms.", + Help: "Time spent processig finalize block.", Buckets: stdprometheus.LinearBuckets(1, 10, 10), }, labels).With(labelsAndValues...), diff --git a/state/metrics.go b/state/metrics.go index 6c238df76..a37515668 100644 --- a/state/metrics.go +++ b/state/metrics.go @@ -14,7 +14,7 @@ const ( // Metrics contains metrics exposed by this package. type Metrics struct { - // Time between BeginBlock and EndBlock in ms. + // Time spent processing FinalizeBlock BlockProcessingTime metrics.Histogram `metrics_buckettype:"lin" metrics_bucketsizes:"1, 10, 10"` // ConsensusParamUpdates is the total number of times the application has diff --git a/state/state.go b/state/state.go index 51ce5a3f8..171a4e713 100644 --- a/state/state.go +++ b/state/state.go @@ -68,7 +68,7 @@ type State struct { LastHeightValidatorsChanged int64 // Consensus parameters used for validating blocks. - // Changes returned by EndBlock and updated after Commit. + // Changes returned by FinalizeBlock and updated after Commit. ConsensusParams types.ConsensusParams LastHeightConsensusParamsChanged int64 diff --git a/state/store_test.go b/state/store_test.go index 6d4715c96..4d7847046 100644 --- a/state/store_test.go +++ b/state/store_test.go @@ -214,11 +214,11 @@ func TestTxResultsHash(t *testing.T) { root := sm.TxResultsHash(txResults) - // root should be Merkle tree root of DeliverTxs responses + // root should be Merkle tree root of ExecTxResult responses results := types.NewResults(txResults) assert.Equal(t, root, results.Hash()) - // test we can prove first DeliverTx + // test we can prove first ExecTxResult proof := results.ProveResult(0) bz, err := results[0].Marshal() require.NoError(t, err) diff --git a/types/block.go b/types/block.go index 5bf1cbfdb..3677be8af 100644 --- a/types/block.go +++ b/types/block.go @@ -342,7 +342,7 @@ type Header struct { ConsensusHash tmbytes.HexBytes `json:"consensus_hash"` // consensus params for current block AppHash tmbytes.HexBytes `json:"app_hash"` // state after txs from the previous block // root hash of all results from the txs from the previous block - // see `deterministicResponseDeliverTx` to understand which parts of a tx is hashed into here + // see `deterministicExecTxResult` to understand which parts of a tx is hashed into here LastResultsHash tmbytes.HexBytes `json:"last_results_hash"` // consensus info diff --git a/types/validation.go b/types/validation.go index 3b33e90db..3601f0479 100644 --- a/types/validation.go +++ b/types/validation.go @@ -19,7 +19,7 @@ func shouldBatchVerify(vals *ValidatorSet, commit *Commit) bool { // // It checks all the signatures! While it's safe to exit as soon as we have // 2/3+ signatures, doing so would impact incentivization logic in the ABCI -// application that depends on the LastCommitInfo sent in BeginBlock, which +// application that depends on the LastCommitInfo sent in FinalizeBlock, which // includes which validators signed. For instance, Gaia incentivizes proposers // with a bonus for including more than +2/3 of the signatures. func VerifyCommit(chainID string, vals *ValidatorSet, blockID BlockID, From 15065f898cdb9fd46274f8439b687f00002f584f Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 15:26:02 +0100 Subject: [PATCH 12/16] catch scenario when recovering directly after an upgrade --- consensus/replay.go | 7 +++++++ state/store.go | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/consensus/replay.go b/consensus/replay.go index 450c1101d..095e75d2a 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -423,6 +423,13 @@ func (h *Handshaker) ReplayBlocks( if err != nil { return nil, err } + // NOTE: There is a rare edge case where a node has upgraded from + // v0.37 with endblock to v0.38 with finalize block and thus + // does not have the app hash saved from the previous height + // here we take the appHash provided from the Info handshake + if len(finalizeBlockResponse.AgreedAppData) == 0 { + finalizeBlockResponse.AgreedAppData = appHash + } mockApp := newMockProxyApp(finalizeBlockResponse) h.logger.Info("Replay last block using mock app") state, err = h.replayBlock(state, storeBlockHeight, mockApp) diff --git a/state/store.go b/state/store.go index 107658d82..cdfce15ea 100644 --- a/state/store.go +++ b/state/store.go @@ -448,6 +448,25 @@ func (store dbStore) LoadLastFinalizeBlockResponse(height int64) (*abci.Response return nil, errors.New("expected height %d but last stored abci responses was at height %d") } + // It is possible if this is called directly after an upgrade that + // ResponseFinalizeBlock is nil. In which case we use the legacy + // ABCI responses + if info.ResponseFinalizeBlock == nil { + // sanity check + if info.LegacyAbciResponses == nil { + panic("state store contains last abci response but it is empty") + } + legacyResp := info.LegacyAbciResponses + return &abci.ResponseFinalizeBlock{ + TxResults: legacyResp.DeliverTxs, + ValidatorUpdates: legacyResp.EndBlock.ValidatorUpdates, + ConsensusParamUpdates: legacyResp.EndBlock.ConsensusParamUpdates, + Events: append(legacyResp.BeginBlock.Events, legacyResp.EndBlock.Events...), + // NOTE: AgreedAppData is missing in the response but will + // be caught and filled in consensus/replay.go + }, nil + } + return info.ResponseFinalizeBlock, nil } From 39ff07e648fa1afbf0695bec6d87188945f0c878 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 22 Nov 2022 15:52:57 +0100 Subject: [PATCH 13/16] fix flakiness in test --- abci/client/socket_client_test.go | 9 ++++----- mempool/v0/clist_mempool_test.go | 3 --- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index 2995c431c..54757eca8 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -53,7 +53,7 @@ func TestHangingAsyncCalls(t *testing.T) { // Call CheckTx reqres, err := c.CheckTxAsync(context.Background(), &types.RequestCheckTx{}) require.NoError(t, err) - // wait 20 ms for all events to travel socket, but + // wait 50 ms for all events to travel socket, but // no response yet from server time.Sleep(50 * time.Millisecond) // kill the server, so the connections break @@ -62,7 +62,6 @@ func TestHangingAsyncCalls(t *testing.T) { // wait for the response from CheckTx reqres.Wait() - fmt.Print(reqres) resp <- c.Error() }() @@ -157,9 +156,9 @@ type slowApp struct { types.BaseApplication } -func (slowApp) CheckTxAsync(_ context.Context, req types.RequestCheckTx) types.ResponseCheckTx { - time.Sleep(200 * time.Millisecond) - return types.ResponseCheckTx{} +func (slowApp) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) { + time.Sleep(time.Second) + return &types.ResponseCheckTx{}, nil } // TestCallbackInvokedWhenSetLaet ensures that the callback is invoked when diff --git a/mempool/v0/clist_mempool_test.go b/mempool/v0/clist_mempool_test.go index 069534f9e..01779348a 100644 --- a/mempool/v0/clist_mempool_test.go +++ b/mempool/v0/clist_mempool_test.go @@ -249,7 +249,6 @@ func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) { mockClient.On("SetLogger", mock.Anything) mockClient.On("Error").Return(nil).Times(4) - mockClient.On("Flush", mock.Anything).Return(nil) mockClient.On("SetResponseCallback", mock.MatchedBy(func(cb abciclient.Callback) bool { callback = cb; return true })) app := kvstore.NewInMemoryApplication() @@ -290,8 +289,6 @@ func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) { req = &abci.RequestCheckTx{Tx: txs[3]} callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp)) mockClient.AssertExpectations(t) - - mockClient.AssertExpectations(t) } func TestMempool_KeepInvalidTxsInCache(t *testing.T) { From 6ce3ccca49c3c4d5ee683ee3425ba5d7331c6f74 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Wed, 23 Nov 2022 16:21:23 +0100 Subject: [PATCH 14/16] fix replay test --- abci/example/kvstore/kvstore.go | 5 +- consensus/replay.go | 6 ++- consensus/replay_test.go | 94 ++++++++++++++++++++++----------- 3 files changed, 69 insertions(+), 36 deletions(-) diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index f80a4fd66..bb8149761 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -77,9 +77,6 @@ func NewInMemoryApplication() *Application { // Tendermint will ensure it is in sync with the application by potentially replaying the blocks it has. If the // Application returns a 0 appBlockHeight, Tendermint will call InitChain to initialize the application with consensus related data func (app *Application) Info(_ context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) { - appHash := make([]byte, 8) - binary.PutVarint(appHash, app.state.Size) - // Tendermint expects the application to persist validators, on start-up we need to reload them to memory if they exist if len(app.valAddrToPubKeyMap) == 0 && app.state.Height > 0 { validators := app.getValidators() @@ -97,7 +94,7 @@ func (app *Application) Info(_ context.Context, req *types.RequestInfo) (*types. Version: version.ABCIVersion, AppVersion: AppVersion, LastBlockHeight: app.state.Height, - LastBlockAppHash: appHash, + LastBlockAppHash: app.state.Hash(), }, nil } diff --git a/consensus/replay.go b/consensus/replay.go index 095e75d2a..c87f996b7 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -266,7 +266,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { } // Replay blocks up to the latest in the blockstore. - _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) + appHash, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) if err != nil { return fmt.Errorf("error on replay: %v", err) } @@ -391,6 +391,7 @@ func (h *Handshaker) ReplayBlocks( // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) + fmt.Println("here3") return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) } else if appBlockHeight == storeBlockHeight { @@ -404,6 +405,7 @@ func (h *Handshaker) ReplayBlocks( // so we'll need to replay a block using the WAL. switch { case appBlockHeight < stateBlockHeight: + fmt.Println("here2") // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) @@ -414,10 +416,12 @@ func (h *Handshaker) ReplayBlocks( // NOTE: We could instead use the cs.WAL on cs.Start, // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT h.logger.Info("Replay last block using real app") + fmt.Println("here4") state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) return state.AppHash, err case appBlockHeight == storeBlockHeight: + fmt.Println("here1") // We ran Commit, but didn't save the state, so replayBlock with mock app. finalizeBlockResponse, err := h.stateStore.LoadLastFinalizeBlockResponse(storeBlockHeight) if err != nil { diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 72c87e743..a47afadad 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -32,6 +32,7 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" + smmocks "github.com/tendermint/tendermint/state/mocks" "github.com/tendermint/tendermint/types" ) @@ -311,7 +312,7 @@ const numBlocks = 6 var modes = []uint{0, 1, 2, 3} // This is actually not a test, it's for storing validator change tx data for testHandshakeReplay -func setupChainWithChangingValidators(t *testing.T, name string) (*cfg.Config, []*types.Block, []*types.Commit, sm.State) { +func setupChainWithChangingValidators(t *testing.T, name string, nBlocks int) (*cfg.Config, []*types.Block, []*types.Commit, sm.State) { nPeers := 7 nVals := 4 css, genDoc, config, cleanup := randConsensusNetWithPeers( @@ -320,7 +321,9 @@ func setupChainWithChangingValidators(t *testing.T, name string) (*cfg.Config, [ nPeers, name, newMockTickerFunc(true), - newPersistentKVStoreWithPath) + func(_ string) abci.Application { + return newKVStore() + }) genesisState, err := sm.MakeGenesisState(genDoc) require.NoError(t, err) t.Cleanup(cleanup) @@ -536,7 +539,7 @@ func setupChainWithChangingValidators(t *testing.T, name string) (*cfg.Config, [ chain := make([]*types.Block, 0) commits := make([]*types.Commit, 0) - for i := 1; i <= numBlocks; i++ { + for i := 1; i <= nBlocks; i++ { chain = append(chain, css[0].blockStore.LoadBlock(int64(i))) commits = append(commits, css[0].blockStore.LoadBlockCommit(int64(i))) } @@ -558,24 +561,36 @@ func TestHandshakeReplayAll(t *testing.T) { // Sync many, not from scratch func TestHandshakeReplaySome(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, 2, m, false) - testHandshakeReplay(t, config, 2, m, true) + t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) { + testHandshakeReplay(t, config, 2, m, false) + }) + t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) { + testHandshakeReplay(t, config, 2, m, true) + }) } } // Sync from lagging by one func TestHandshakeReplayOne(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, numBlocks-1, m, false) - testHandshakeReplay(t, config, numBlocks-1, m, true) + t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) { + testHandshakeReplay(t, config, numBlocks-1, m, false) + }) + t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) { + testHandshakeReplay(t, config, numBlocks-1, m, true) + }) } } // Sync from caught up func TestHandshakeReplayNone(t *testing.T) { for _, m := range modes { - testHandshakeReplay(t, config, numBlocks, m, false) - testHandshakeReplay(t, config, numBlocks, m, true) + t.Run(fmt.Sprintf("mode_%d_single", m), func(t *testing.T) { + testHandshakeReplay(t, config, numBlocks, m, false) + }) + t.Run(fmt.Sprintf("mode_%d_multi", m), func(t *testing.T) { + testHandshakeReplay(t, config, numBlocks, m, true) + }) } } @@ -609,7 +624,8 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin ) if testValidatorsChange { - testConfig, chain, commits, genesisState = setupChainWithChangingValidators(t, fmt.Sprintf("%d_%d_m", nBlocks, mode)) + testConfig, chain, commits, genesisState = setupChainWithChangingValidators(t, fmt.Sprintf("%d_%d_m", nBlocks, mode), numBlocks) + stateDB = dbm.NewMemDB() store = newMockBlockStore(t, config, genesisState.ConsensusParams) } else { testConfig = ResetConfig(fmt.Sprintf("%d_%d_s", nBlocks, mode)) @@ -619,9 +635,9 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin walBody, err := WALWithNBlocks(t, numBlocks, testConfig) require.NoError(t, err) walFile := tempWALWithData(walBody) - config.Consensus.SetWalFile(walFile) + testConfig.Consensus.SetWalFile(walFile) - privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()) + privVal := privval.LoadFilePV(testConfig.PrivValidatorKeyFile(), testConfig.PrivValidatorStateFile()) wal, err := NewWAL(walFile) require.NoError(t, err) @@ -637,7 +653,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin require.NoError(t, err) pubKey, err := privVal.GetPubKey() require.NoError(t, err) - stateDB, genesisState, store = stateAndStore(t, config, pubKey, kvstore.AppVersion) + stateDB, genesisState, store = stateAndStore(t, testConfig, pubKey, kvstore.AppVersion) } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ @@ -651,12 +667,11 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin state := genesisState.Copy() // run the chain through state.ApplyBlock to build up the tendermint state - state = buildTMStateFromChain(t, config, stateStore, mempool, evpool, state, chain, nBlocks, mode, store) - latestAppHash := state.AppHash + state, latestAppHash := buildTMStateFromChain(t, testConfig, stateStore, mempool, evpool, state, chain, nBlocks, mode, store) // make a new client creator kvstoreApp := kvstore.NewPersistentApplication( - filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_a", nBlocks, mode))) + filepath.Join(testConfig.DBDir(), fmt.Sprintf("replay_test_%d_%d_a", nBlocks, mode))) t.Cleanup(func() { _ = kvstoreApp.Close() }) @@ -667,12 +682,12 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin // use a throwaway tendermint state proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics()) stateDB1 := dbm.NewMemDB() - stateStore := sm.NewStore(stateDB1, sm.StoreOptions{ + dummyStateStore := sm.NewStore(stateDB1, sm.StoreOptions{ DiscardABCIResponses: false, }) - err := stateStore.Save(genesisState) + err := dummyStateStore.Save(genesisState) require.NoError(t, err) - buildAppStateFromChain(t, proxyApp, stateStore, mempool, evpool, genesisState, chain, nBlocks, mode, store) + buildAppStateFromChain(t, proxyApp, dummyStateStore, mempool, evpool, genesisState, chain, nBlocks, mode, store) } // Prune block store if requested @@ -685,7 +700,8 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin } // now start the app using the handshake - it should sync - genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) + genDoc, err := sm.MakeGenesisDocFromFile(testConfig.GenesisFile()) + require.NoError(t, err) handshaker := NewHandshaker(stateStore, state, store, genDoc) proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics()) if err := proxyApp.Start(); err != nil { @@ -698,19 +714,29 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin } }) - err := handshaker.Handshake(proxyApp) + // perform the replay protocol to sync Tendermint and the application + err = handshaker.Handshake(proxyApp) if expectError { require.Error(t, err) return - } else if err != nil { - t.Fatalf("Error on abci handshake: %v", err) + } else { + require.NoError(t, err) } // get the latest app hash from the app - res, err := proxyApp.Query().Info(context.Background(), &abci.RequestInfo{Version: ""}) - if err != nil { - t.Fatal(err) - } + res, err := proxyApp.Query().Info(context.Background(), proxy.RequestInfo) + require.NoError(t, err) + + // block store and app height should be in sync + require.Equal(t, store.Height(), res.LastBlockHeight) + + // tendermint state height and app height should be in sync + state, err = stateStore.Load() + require.NoError(t, err) + require.Equal(t, state.LastBlockHeight, res.LastBlockHeight) + require.Equal(t, int64(numBlocks), res.LastBlockHeight) + + fmt.Printf("mode: %d, appHash: %X, data: %s\n", mode, latestAppHash, res.Data) // the app hash should be synced up if !bytes.Equal(latestAppHash, res.LastBlockAppHash) { @@ -795,7 +821,7 @@ func buildTMStateFromChain( chain []*types.Block, nBlocks int, mode uint, - bs sm.BlockStore) sm.State { + bs sm.BlockStore) (sm.State, []byte) { // run the whole chain against this client to build up the tendermint state clientCreator := proxy.NewLocalClientCreator( kvstore.NewPersistentApplication( @@ -822,6 +848,7 @@ func buildTMStateFromChain( for _, block := range chain { state = applyBlock(t, stateStore, mempool, evpool, state, block, proxyApp, bs) } + return state, state.AppHash case 1, 2, 3: // sync up to the penultimate as if we stored the block. @@ -830,14 +857,19 @@ func buildTMStateFromChain( state = applyBlock(t, stateStore, mempool, evpool, state, block, proxyApp, bs) } + dummyStateStore := &smmocks.Store{} + vals, _ := stateStore.LoadValidators(int64(len(chain) - 1)) + dummyStateStore.On("LoadValidators", int64(5)).Return(vals, nil) + dummyStateStore.On("Save", mock.Anything).Return(nil) + dummyStateStore.On("SaveFinalizeBlockResponse", mock.Anything, mock.Anything).Return(nil) + // apply the final block to a state copy so we can // get the right next appHash but keep the state back - applyBlock(t, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, bs) + s := applyBlock(t, dummyStateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, bs) + return state, s.AppHash default: panic(fmt.Sprintf("unknown mode %v", mode)) } - - return state } func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { From dc17927d2db303e65bfd6c4b2bf3e913553254ee Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Wed, 23 Nov 2022 18:27:01 +0100 Subject: [PATCH 15/16] fix replay_test --- consensus/replay.go | 4 ---- consensus/replay_test.go | 22 ++++++++++++++-------- state/store.go | 2 +- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index c87f996b7..ea9b2ff46 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -391,7 +391,6 @@ func (h *Handshaker) ReplayBlocks( // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - fmt.Println("here3") return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) } else if appBlockHeight == storeBlockHeight { @@ -405,7 +404,6 @@ func (h *Handshaker) ReplayBlocks( // so we'll need to replay a block using the WAL. switch { case appBlockHeight < stateBlockHeight: - fmt.Println("here2") // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) @@ -416,12 +414,10 @@ func (h *Handshaker) ReplayBlocks( // NOTE: We could instead use the cs.WAL on cs.Start, // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT h.logger.Info("Replay last block using real app") - fmt.Println("here4") state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) return state.AppHash, err case appBlockHeight == storeBlockHeight: - fmt.Println("here1") // We ran Commit, but didn't save the state, so replayBlock with mock app. finalizeBlockResponse, err := h.stateStore.LoadLastFinalizeBlockResponse(storeBlockHeight) if err != nil { diff --git a/consensus/replay_test.go b/consensus/replay_test.go index a47afadad..f82dea501 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -306,9 +306,9 @@ const numBlocks = 6 // Test handshake/replay // 0 - all synced up -// 1 - saved block but app and state are behind -// 2 - save block and committed but state is behind -// 3 - save block and committed with truncated block store and state behind +// 1 - saved block but app and state are behind by one height +// 2 - save block and committed (i.e. app got `Commit`) but state is behind +// 3 - same as 2 but with a truncated block store var modes = []uint{0, 1, 2, 3} // This is actually not a test, it's for storing validator change tx data for testHandshakeReplay @@ -736,8 +736,6 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin require.Equal(t, state.LastBlockHeight, res.LastBlockHeight) require.Equal(t, int64(numBlocks), res.LastBlockHeight) - fmt.Printf("mode: %d, appHash: %X, data: %s\n", mode, latestAppHash, res.Data) - // the app hash should be synced up if !bytes.Equal(latestAppHash, res.LastBlockAppHash) { t.Fatalf( @@ -800,9 +798,12 @@ func buildAppStateFromChain(t *testing.T, proxyApp proxy.AppConns, stateStore sm state = applyBlock(t, stateStore, mempool, evpool, state, block, proxyApp, bs) } + // mode 1 only the block at the last height is saved + // mode 2 and 3, the block is saved, commit is called, but the state is not saved if mode == 2 || mode == 3 { // update the kvstore height and apphash // as if we ran commit but not + // here we expect a dummy state store to be used state = applyBlock(t, stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, bs) } default: @@ -858,10 +859,15 @@ func buildTMStateFromChain( } dummyStateStore := &smmocks.Store{} - vals, _ := stateStore.LoadValidators(int64(len(chain) - 1)) - dummyStateStore.On("LoadValidators", int64(5)).Return(vals, nil) + lastHeight := int64(len(chain)) + penultimateHeight := int64(len(chain) - 1) + vals, _ := stateStore.LoadValidators(penultimateHeight) + dummyStateStore.On("LoadValidators", penultimateHeight).Return(vals, nil) dummyStateStore.On("Save", mock.Anything).Return(nil) - dummyStateStore.On("SaveFinalizeBlockResponse", mock.Anything, mock.Anything).Return(nil) + dummyStateStore.On("SaveFinalizeBlockResponse", lastHeight, mock.MatchedBy(func(response *abci.ResponseFinalizeBlock) bool { + stateStore.SaveFinalizeBlockResponse(lastHeight, response) + return true + })).Return(nil) // apply the final block to a state copy so we can // get the right next appHash but keep the state back diff --git a/state/store.go b/state/store.go index cdfce15ea..f2da03ec3 100644 --- a/state/store.go +++ b/state/store.go @@ -445,7 +445,7 @@ func (store dbStore) LoadLastFinalizeBlockResponse(height int64) (*abci.Response // Here we validate the result by comparing its height to the expected height. if height != info.GetHeight() { - return nil, errors.New("expected height %d but last stored abci responses was at height %d") + return nil, fmt.Errorf("expected height %d but last stored abci responses was at height %d", height, info.GetHeight()) } // It is possible if this is called directly after an upgrade that From 27c1d6e3a75bc92dd9d0d0fdcb52101d881321b2 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Wed, 23 Nov 2022 18:30:52 +0100 Subject: [PATCH 16/16] cleanup test --- consensus/replay_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index f82dea501..df88ac096 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -718,10 +718,10 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin err = handshaker.Handshake(proxyApp) if expectError { require.Error(t, err) + // finish the test early return - } else { - require.NoError(t, err) } + require.NoError(t, err) // get the latest app hash from the app res, err := proxyApp.Query().Info(context.Background(), proxy.RequestInfo) @@ -865,7 +865,7 @@ func buildTMStateFromChain( dummyStateStore.On("LoadValidators", penultimateHeight).Return(vals, nil) dummyStateStore.On("Save", mock.Anything).Return(nil) dummyStateStore.On("SaveFinalizeBlockResponse", lastHeight, mock.MatchedBy(func(response *abci.ResponseFinalizeBlock) bool { - stateStore.SaveFinalizeBlockResponse(lastHeight, response) + require.NoError(t, stateStore.SaveFinalizeBlockResponse(lastHeight, response)) return true })).Return(nil)