mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-07 22:05:18 +00:00
abci/grpc: return async responses in order (#5520)
Fixes #5439. This is really a workaround for #5519 (unless we require async implementations to return ordered responses, but that kind of defeats the purpose of having an async API).
This commit is contained in:
@@ -26,4 +26,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
|
||||
### BUG FIXES
|
||||
|
||||
- [abci/grpc] \#5520 Return async responses in order, to avoid mempool panics. (@erikgrinaker)
|
||||
|
||||
- [types] \#5523 Change json naming of `PartSetHeader` within `BlockID` from `parts` to `part_set_header` (@marbar3778)
|
||||
|
||||
@@ -22,8 +22,9 @@ type grpcClient struct {
|
||||
service.BaseService
|
||||
mustConnect bool
|
||||
|
||||
client types.ABCIApplicationClient
|
||||
conn *grpc.ClientConn
|
||||
client types.ABCIApplicationClient
|
||||
conn *grpc.ClientConn
|
||||
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
|
||||
|
||||
mtx tmsync.Mutex
|
||||
addr string
|
||||
@@ -35,6 +36,13 @@ func NewGRPCClient(addr string, mustConnect bool) Client {
|
||||
cli := &grpcClient{
|
||||
addr: addr,
|
||||
mustConnect: mustConnect,
|
||||
// Buffering the channel is needed to make calls appear asynchronous,
|
||||
// which is required when the caller makes multiple async calls before
|
||||
// processing callbacks (e.g. due to holding locks). 64 means that a
|
||||
// caller can make up to 64 async calls before a callback must be
|
||||
// processed (otherwise it deadlocks). It also means that we can make 64
|
||||
// gRPC calls while processing a slow callback at the channel head.
|
||||
chReqRes: make(chan *ReqRes, 64),
|
||||
}
|
||||
cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli)
|
||||
return cli
|
||||
@@ -48,6 +56,34 @@ func (cli *grpcClient) OnStart() error {
|
||||
if err := cli.BaseService.OnStart(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// This processes asynchronous request/response messages and dispatches
|
||||
// them to callbacks.
|
||||
go func() {
|
||||
// Use a separate function to use defer for mutex unlocks (this handles panics)
|
||||
callCb := func(reqres *ReqRes) {
|
||||
cli.mtx.Lock()
|
||||
defer cli.mtx.Unlock()
|
||||
|
||||
// Notify client listener if set
|
||||
if cli.resCb != nil {
|
||||
cli.resCb(reqres.Request, reqres.Response)
|
||||
}
|
||||
|
||||
// Notify reqRes listener if set
|
||||
if cb := reqres.GetCallback(); cb != nil {
|
||||
cb(reqres.Response)
|
||||
}
|
||||
}
|
||||
for reqres := range cli.chReqRes {
|
||||
if reqres != nil {
|
||||
callCb(reqres)
|
||||
} else {
|
||||
cli.Logger.Error("Received nil reqres")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
RETRY_LOOP:
|
||||
for {
|
||||
conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
|
||||
@@ -85,6 +121,7 @@ func (cli *grpcClient) OnStop() {
|
||||
if cli.conn != nil {
|
||||
cli.conn.Close()
|
||||
}
|
||||
close(cli.chReqRes)
|
||||
}
|
||||
|
||||
func (cli *grpcClient) StopForError(err error) {
|
||||
@@ -254,26 +291,10 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot
|
||||
|
||||
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
|
||||
reqres := NewReqRes(req)
|
||||
reqres.Response = res // Set response
|
||||
reqres.Done() // Release waiters
|
||||
reqres.SetDone() // so reqRes.SetCallback will run the callback
|
||||
|
||||
// goroutine for callbacks
|
||||
go func() {
|
||||
cli.mtx.Lock()
|
||||
defer cli.mtx.Unlock()
|
||||
|
||||
// Notify client listener if set
|
||||
if cli.resCb != nil {
|
||||
cli.resCb(reqres.Request, res)
|
||||
}
|
||||
|
||||
// Notify reqRes listener if set
|
||||
if cb := reqres.GetCallback(); cb != nil {
|
||||
cb(res)
|
||||
}
|
||||
}()
|
||||
|
||||
reqres.Response = res // Set response
|
||||
reqres.Done() // Release waiters
|
||||
reqres.SetDone() // so reqRes.SetCallback will run the callback
|
||||
cli.chReqRes <- reqres // use channel for async responses, since they must be ordered
|
||||
return reqres
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user