From da6ec8f08294d1e4775bbd88c568ae545c762aa8 Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Wed, 13 Apr 2022 15:30:36 -0400 Subject: [PATCH] invoke callbacks when set late in socket client (#8331) --- abci/client/client.go | 26 ++++++------ abci/client/grpc_client.go | 5 +-- abci/client/local_client.go | 5 ++- abci/client/socket_client_test.go | 69 +++++++++++++++++++++++++++++++ 4 files changed, 86 insertions(+), 19 deletions(-) diff --git a/abci/client/client.go b/abci/client/client.go index c5c1ab219..c5c7c82b3 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -81,9 +81,15 @@ type ReqRes struct { *sync.WaitGroup *types.Response // Not set atomically, so be sure to use WaitGroup. - mtx tmsync.Mutex - done bool // Gets set to true once *after* WaitGroup.Done(). - cb func(*types.Response) // A single callback that may be set. + mtx tmsync.Mutex + + // callbackInvoked as a variable to track if the callback was already + // invoked during the regular execution of the request. This variable + // allows clients to set the callback simultaneously without potentially + // invoking the callback twice by accident, once when 'SetCallback' is + // called and once during the normal request. + callbackInvoked bool + cb func(*types.Response) // A single callback that may be set. } func NewReqRes(req *types.Request) *ReqRes { @@ -92,8 +98,8 @@ func NewReqRes(req *types.Request) *ReqRes { WaitGroup: waitGroup1(), Response: nil, - done: false, - cb: nil, + callbackInvoked: false, + cb: nil, } } @@ -103,7 +109,7 @@ func NewReqRes(req *types.Request) *ReqRes { func (r *ReqRes) SetCallback(cb func(res *types.Response)) { r.mtx.Lock() - if r.done { + if r.callbackInvoked { r.mtx.Unlock() cb(r.Response) return @@ -122,6 +128,7 @@ func (r *ReqRes) InvokeCallback() { if r.cb != nil { r.cb(r.Response) } + r.callbackInvoked = true } // GetCallback returns the configured callback of the ReqRes object which may be @@ -136,13 +143,6 @@ func (r *ReqRes) GetCallback() func(*types.Response) { return r.cb } -// SetDone marks the ReqRes object as done. -func (r *ReqRes) SetDone() { - r.mtx.Lock() - r.done = true - r.mtx.Unlock() -} - func waitGroup1() (wg *sync.WaitGroup) { wg = &sync.WaitGroup{} wg.Add(1) diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 0a0b100e6..6d01a7503 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -66,7 +66,6 @@ func (cli *grpcClient) OnStart() error { cli.mtx.Lock() defer cli.mtx.Unlock() - reqres.SetDone() reqres.Done() // Notify client listener if set @@ -75,9 +74,7 @@ func (cli *grpcClient) OnStart() error { } // Notify reqRes listener if set - if cb := reqres.GetCallback(); cb != nil { - cb(reqres.Response) - } + reqres.InvokeCallback() } for reqres := range cli.chReqRes { if reqres != nil { diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 01fc82825..62b0942c1 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -327,12 +327,13 @@ func (app *localClient) ApplySnapshotChunkSync( func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes { app.Callback(req, res) - return newLocalReqRes(req, res) + rr := newLocalReqRes(req, res) + rr.callbackInvoked = true + return rr } func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes { reqRes := NewReqRes(req) reqRes.Response = res - reqRes.SetDone() return reqRes } diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index 90b894b71..9bf28fe12 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -2,6 +2,7 @@ package abcicli_test import ( "fmt" + "sync" "testing" "time" @@ -118,3 +119,71 @@ func (slowApp) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock time.Sleep(200 * time.Millisecond) return types.ResponseBeginBlock{} } + +// TestCallbackInvokedWhenSetLaet ensures that the callback is invoked when +// set after the client completes the call into the app. Currently this +// test relies on the callback being allowed to be invoked twice if set multiple +// times, once when set early and once when set late. +func TestCallbackInvokedWhenSetLate(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + app := blockedABCIApplication{ + wg: wg, + } + _, c := setupClientServer(t, app) + reqRes := c.CheckTxAsync(types.RequestCheckTx{}) + + done := make(chan struct{}) + cb := func(_ *types.Response) { + close(done) + } + reqRes.SetCallback(cb) + app.wg.Done() + <-done + + var called bool + cb = func(_ *types.Response) { + called = true + } + reqRes.SetCallback(cb) + require.True(t, called) +} + +type blockedABCIApplication struct { + wg *sync.WaitGroup + types.BaseApplication +} + +func (b blockedABCIApplication) CheckTx(r types.RequestCheckTx) types.ResponseCheckTx { + b.wg.Wait() + return b.BaseApplication.CheckTx(r) +} + +// TestCallbackInvokedWhenSetEarly ensures that the callback is invoked when +// set before the client completes the call into the app. +func TestCallbackInvokedWhenSetEarly(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + app := blockedABCIApplication{ + wg: wg, + } + _, c := setupClientServer(t, app) + reqRes := c.CheckTxAsync(types.RequestCheckTx{}) + + done := make(chan struct{}) + cb := func(_ *types.Response) { + close(done) + } + reqRes.SetCallback(cb) + app.wg.Done() + + called := func() bool { + select { + case <-done: + return true + default: + return false + } + } + require.Eventually(t, called, time.Second, time.Millisecond*25) +}