mirror of
https://github.com/tendermint/tendermint.git
synced 2025-12-23 06:15:19 +00:00
This commit is contained in:
@@ -81,7 +81,7 @@ type ReqRes struct {
|
||||
*sync.WaitGroup
|
||||
*types.Response // Not set atomically, so be sure to use WaitGroup.
|
||||
|
||||
mtx tmsync.RWMutex
|
||||
mtx tmsync.Mutex
|
||||
done bool // Gets set to true once *after* WaitGroup.Done().
|
||||
cb func(*types.Response) // A single callback that may be set.
|
||||
}
|
||||
@@ -131,16 +131,16 @@ func (r *ReqRes) InvokeCallback() {
|
||||
//
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5439
|
||||
func (r *ReqRes) GetCallback() func(*types.Response) {
|
||||
r.mtx.RLock()
|
||||
defer r.mtx.RUnlock()
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
return r.cb
|
||||
}
|
||||
|
||||
// SetDone marks the ReqRes object as done.
|
||||
func (r *ReqRes) SetDone() {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
r.done = true
|
||||
r.mtx.Unlock()
|
||||
}
|
||||
|
||||
func waitGroup1() (wg *sync.WaitGroup) {
|
||||
|
||||
@@ -27,7 +27,7 @@ type grpcClient struct {
|
||||
conn *grpc.ClientConn
|
||||
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
|
||||
|
||||
mtx tmsync.RWMutex
|
||||
mtx tmsync.Mutex
|
||||
addr string
|
||||
err error
|
||||
resCb func(*types.Request, *types.Response) // listens to all callbacks
|
||||
@@ -146,8 +146,8 @@ func (cli *grpcClient) StopForError(err error) {
|
||||
}
|
||||
|
||||
func (cli *grpcClient) Error() error {
|
||||
cli.mtx.RLock()
|
||||
defer cli.mtx.RUnlock()
|
||||
cli.mtx.Lock()
|
||||
defer cli.mtx.Unlock()
|
||||
return cli.err
|
||||
}
|
||||
|
||||
@@ -155,8 +155,8 @@ func (cli *grpcClient) Error() error {
|
||||
// NOTE: callback may get internally generated flush responses.
|
||||
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
|
||||
cli.mtx.Lock()
|
||||
defer cli.mtx.Unlock()
|
||||
cli.resCb = resCb
|
||||
cli.mtx.Unlock()
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
@@ -15,7 +15,7 @@ var _ Client = (*localClient)(nil)
|
||||
type localClient struct {
|
||||
service.BaseService
|
||||
|
||||
mtx *tmsync.RWMutex
|
||||
mtx *tmsync.Mutex
|
||||
types.Application
|
||||
Callback
|
||||
}
|
||||
@@ -26,24 +26,22 @@ var _ Client = (*localClient)(nil)
|
||||
// methods of the given app.
|
||||
//
|
||||
// Both Async and Sync methods ignore the given context.Context parameter.
|
||||
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
|
||||
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
|
||||
if mtx == nil {
|
||||
mtx = &tmsync.RWMutex{}
|
||||
mtx = new(tmsync.Mutex)
|
||||
}
|
||||
|
||||
cli := &localClient{
|
||||
mtx: mtx,
|
||||
Application: app,
|
||||
}
|
||||
|
||||
cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
|
||||
return cli
|
||||
}
|
||||
|
||||
func (app *localClient) SetResponseCallback(cb Callback) {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
app.Callback = cb
|
||||
app.mtx.Unlock()
|
||||
}
|
||||
|
||||
// TODO: change types.Application to include Error()?
|
||||
@@ -67,8 +65,8 @@ func (app *localClient) EchoAsync(msg string) *ReqRes {
|
||||
}
|
||||
|
||||
func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes {
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
|
||||
res := app.Application.Info(req)
|
||||
return app.callback(
|
||||
@@ -111,8 +109,8 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
|
||||
}
|
||||
|
||||
func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
|
||||
res := app.Application.Query(req)
|
||||
return app.callback(
|
||||
@@ -220,8 +218,8 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) {
|
||||
}
|
||||
|
||||
func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
|
||||
res := app.Application.Info(req)
|
||||
return &res, nil
|
||||
@@ -252,8 +250,8 @@ func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCh
|
||||
}
|
||||
|
||||
func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
|
||||
res := app.Application.Query(req)
|
||||
return &res, nil
|
||||
|
||||
@@ -34,7 +34,7 @@ type socketClient struct {
|
||||
reqQueue chan *ReqRes
|
||||
flushTimer *timer.ThrottleTimer
|
||||
|
||||
mtx tmsync.RWMutex
|
||||
mtx tmsync.Mutex
|
||||
err error
|
||||
reqSent *list.List // list of requests sent, waiting for response
|
||||
resCb func(*types.Request, *types.Response) // called on all requests, if set.
|
||||
@@ -99,8 +99,8 @@ func (cli *socketClient) OnStop() {
|
||||
|
||||
// Error returns an error if the client was stopped abruptly.
|
||||
func (cli *socketClient) Error() error {
|
||||
cli.mtx.RLock()
|
||||
defer cli.mtx.RUnlock()
|
||||
cli.mtx.Lock()
|
||||
defer cli.mtx.Unlock()
|
||||
return cli.err
|
||||
}
|
||||
|
||||
@@ -110,8 +110,8 @@ func (cli *socketClient) Error() error {
|
||||
// NOTE: callback may get internally generated flush responses.
|
||||
func (cli *socketClient) SetResponseCallback(resCb Callback) {
|
||||
cli.mtx.Lock()
|
||||
defer cli.mtx.Unlock()
|
||||
cli.resCb = resCb
|
||||
cli.mtx.Unlock()
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user