diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 899d51635..ac7ac7289 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -37,24 +37,32 @@ func (env *Environment) BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*co err := env.Mempool.CheckTx( ctx.Context(), tx, - func(res *abci.Response) { resCh <- res }, + func(res *abci.Response) { + select { + case <-ctx.Context().Done(): + case resCh <- res: + } + }, mempool.TxInfo{}, ) if err != nil { return nil, err } - res := <-resCh - r := res.GetCheckTx() - - return &coretypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, - Codespace: r.Codespace, - MempoolError: r.MempoolError, - Hash: tx.Hash(), - }, nil + select { + case <-ctx.Context().Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err()) + case res := <-resCh: + r := res.GetCheckTx() + return &coretypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + Codespace: r.Codespace, + MempoolError: r.MempoolError, + Hash: tx.Hash(), + }, nil + } } // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. @@ -64,61 +72,71 @@ func (env *Environment) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (* err := env.Mempool.CheckTx( ctx.Context(), tx, - func(res *abci.Response) { resCh <- res }, + func(res *abci.Response) { + select { + case <-ctx.Context().Done(): + case resCh <- res: + } + }, mempool.TxInfo{}, ) if err != nil { return nil, err } - r := (<-resCh).GetCheckTx() - if r.Code != abci.CodeTypeOK { - return &coretypes.ResultBroadcastTxCommit{ - CheckTx: *r, - Hash: tx.Hash(), - }, fmt.Errorf("transaction encountered error (%s)", r.MempoolError) - } - - if !indexer.KVSinkEnabled(env.EventSinks) { - return &coretypes.ResultBroadcastTxCommit{ + select { + case <-ctx.Context().Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err()) + case res := <-resCh: + r := res.GetCheckTx() + if r.Code != abci.CodeTypeOK { + return &coretypes.ResultBroadcastTxCommit{ CheckTx: *r, Hash: tx.Hash(), - }, - errors.New("cannot wait for commit because kvEventSync is not enabled") - } + }, fmt.Errorf("transaction encountered error (%s)", r.MempoolError) + } - startAt := time.Now() - timer := time.NewTimer(0) - defer timer.Stop() - - count := 0 - for { - count++ - select { - case <-ctx.Context().Done(): - env.Logger.Error("Error on broadcastTxCommit", - "duration", time.Since(startAt), - "err", err) + if !indexer.KVSinkEnabled(env.EventSinks) { return &coretypes.ResultBroadcastTxCommit{ CheckTx: *r, Hash: tx.Hash(), - }, fmt.Errorf("timeout waiting for commit of tx %s (%s)", - tx.Hash(), time.Since(startAt)) - case <-timer.C: - txres, err := env.Tx(ctx, tx.Hash(), false) - if err != nil { - jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec - backoff := 100 * time.Duration(count) * time.Millisecond - timer.Reset(jitter + backoff) - continue - } + }, + errors.New("cannot confirm transaction because kvEventSink is not enabled") + } - return &coretypes.ResultBroadcastTxCommit{ - CheckTx: *r, - DeliverTx: txres.TxResult, - Hash: tx.Hash(), - Height: txres.Height, - }, nil + startAt := time.Now() + timer := time.NewTimer(0) + defer timer.Stop() + + count := 0 + for { + count++ + select { + case <-ctx.Context().Done(): + env.Logger.Error("error on broadcastTxCommit", + "duration", time.Since(startAt), + "err", err) + return &coretypes.ResultBroadcastTxCommit{ + CheckTx: *r, + Hash: tx.Hash(), + }, fmt.Errorf("timeout waiting for commit of tx %s (%s)", + tx.Hash(), time.Since(startAt)) + case <-timer.C: + txres, err := env.Tx(ctx, tx.Hash(), false) + if err != nil { + jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec + backoff := 100 * time.Duration(count) * time.Millisecond + timer.Reset(jitter + backoff) + continue + } + + return &coretypes.ResultBroadcastTxCommit{ + CheckTx: *r, + DeliverTx: txres.TxResult, + Hash: tx.Hash(), + Height: txres.Height, + }, nil + } } } }