From c45367e22c8fad4c46c5cd4026df0ab805887461 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 13 Apr 2022 14:09:49 -0400 Subject: [PATCH] rpc: avoid leaking threads (#8328) --- internal/rpc/core/mempool.go | 122 ++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 52 deletions(-) diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index c2a9084db..a852aeb1d 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -38,23 +38,32 @@ func (env *Environment) BroadcastTxSync(ctx context.Context, tx types.Tx) (*core err := env.Mempool.CheckTx( ctx, tx, - func(res *abci.ResponseCheckTx) { resCh <- res }, + func(res *abci.ResponseCheckTx) { + select { + case <-ctx.Done(): + case resCh <- res: + } + }, mempool.TxInfo{}, ) if err != nil { return nil, err } - r := <-resCh + select { + case <-ctx.Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Err()) + case r := <-resCh: + return &coretypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + Codespace: r.Codespace, + MempoolError: r.MempoolError, + Hash: tx.Hash(), + }, nil + } - return &coretypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, - Codespace: r.Codespace, - MempoolError: r.MempoolError, - Hash: tx.Hash(), - }, nil } // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. @@ -64,61 +73,70 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*co err := env.Mempool.CheckTx( ctx, tx, - func(res *abci.ResponseCheckTx) { resCh <- res }, + func(res *abci.ResponseCheckTx) { + select { + case <-ctx.Done(): + case resCh <- res: + } + }, mempool.TxInfo{}, ) if err != nil { return nil, err } - r := <-resCh - if r.Code != abci.CodeTypeOK { - return &coretypes.ResultBroadcastTxCommit{ - CheckTx: *r, - Hash: tx.Hash(), - }, fmt.Errorf("transaction encountered error (%s)", r.MempoolError) - } - - if !indexer.KVSinkEnabled(env.EventSinks) { - return &coretypes.ResultBroadcastTxCommit{ + select { + case <-ctx.Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Err()) + case r := <-resCh: + if r.Code != abci.CodeTypeOK { + return &coretypes.ResultBroadcastTxCommit{ CheckTx: *r, Hash: tx.Hash(), - }, - errors.New("cannot confirm transaction because kvEventSink is not enabled") - } + }, fmt.Errorf("transaction encountered error (%s)", r.MempoolError) + } - startAt := time.Now() - timer := time.NewTimer(0) - defer timer.Stop() - - count := 0 - for { - count++ - select { - case <-ctx.Done(): - env.Logger.Error("error on broadcastTxCommit", - "duration", time.Since(startAt), - "err", err) + if !indexer.KVSinkEnabled(env.EventSinks) { return &coretypes.ResultBroadcastTxCommit{ CheckTx: *r, Hash: tx.Hash(), - }, fmt.Errorf("timeout waiting for commit of tx %s (%s)", - tx.Hash(), time.Since(startAt)) - case <-timer.C: - txres, err := env.Tx(ctx, tx.Hash(), false) - if err != nil { - jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec - backoff := 100 * time.Duration(count) * time.Millisecond - timer.Reset(jitter + backoff) - continue - } + }, + errors.New("cannot confirm transaction because kvEventSink is not enabled") + } - return &coretypes.ResultBroadcastTxCommit{ - CheckTx: *r, - TxResult: txres.TxResult, - Hash: tx.Hash(), - Height: txres.Height, - }, nil + startAt := time.Now() + timer := time.NewTimer(0) + defer timer.Stop() + + count := 0 + for { + count++ + select { + case <-ctx.Done(): + env.Logger.Error("error on broadcastTxCommit", + "duration", time.Since(startAt), + "err", err) + return &coretypes.ResultBroadcastTxCommit{ + CheckTx: *r, + Hash: tx.Hash(), + }, fmt.Errorf("timeout waiting for commit of tx %s (%s)", + tx.Hash(), time.Since(startAt)) + case <-timer.C: + txres, err := env.Tx(ctx, tx.Hash(), false) + if err != nil { + jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec + backoff := 100 * time.Duration(count) * time.Millisecond + timer.Reset(jitter + backoff) + continue + } + + return &coretypes.ResultBroadcastTxCommit{ + CheckTx: *r, + TxResult: txres.TxResult, + Hash: tx.Hash(), + Height: txres.Height, + }, nil + } } } }