Compare commits

...

3 Commits

Author SHA1 Message Date
William Banfield
8dfd011a7f americanize canceled 2022-02-18 16:22:01 -05:00
William Banfield
d9c9f3277d Merge branch 'master' into wb/undo-queue-buffer-limit 2022-02-18 16:17:45 -05:00
William Banfield
da767e732c abci: undo socket buffer limit 2022-02-18 15:47:31 -05:00

View File

@@ -207,7 +207,7 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
//----------------------------------------
func (cli *socketClient) Flush(ctx context.Context) error {
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush(), true)
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush())
if err != nil {
return queueErr(err)
}
@@ -389,29 +389,22 @@ func (cli *socketClient) FinalizeBlock(
//----------------------------------------
// queueRequest enqueues req onto the queue. If the queue is full, it ether
// returns an error (sync=false) or blocks (sync=true).
//
// When sync=true, ctx can be used to break early. When sync=false, ctx will be
// used later to determine if request should be dropped (if ctx.Err is
// non-nil).
// queueRequest enqueues req onto the queue. The request can break early if the
// the context is canceled. If the queue is full, this method blocks to allow
// the request to be placed onto the queue. This has the effect of creating an
// unbounded queue of goroutines waiting to write to this queue which is a bit
// antithetical to the purposes of a queue, however, undoing this behavior has
// dangerous upstream implications as a result of the usage of this behavior upstream.
// Remove at your peril.
//
// The caller is responsible for checking cli.Error.
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, sync bool) (*ReqRes, error) {
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) {
reqres := NewReqRes(req)
if sync {
select {
case cli.reqQueue <- reqres:
case <-ctx.Done():
return nil, ctx.Err()
}
} else {
select {
case cli.reqQueue <- reqres:
default:
return nil, errors.New("buffer is full")
}
select {
case cli.reqQueue <- reqres:
case <-ctx.Done():
return nil, ctx.Err()
}
return reqres, nil
@@ -422,7 +415,7 @@ func (cli *socketClient) queueRequestAndFlush(
req *types.Request,
) (*ReqRes, error) {
reqres, err := cli.queueRequest(ctx, req, true)
reqres, err := cli.queueRequest(ctx, req)
if err != nil {
return nil, queueErr(err)
}