diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 79aa03bc5..74212e9d1 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -34,20 +34,29 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) err := env.Mempool.CheckTx(tx, func(res *abci.Response) { - resCh <- res + select { + case <-ctx.Context().Done(): + case resCh <- res: + } + }, mempl.TxInfo{}) if err != nil { return nil, err } - res := <-resCh - r := res.GetCheckTx() - return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, - Codespace: r.Codespace, - Hash: tx.Hash(), - }, nil + + select { + case <-ctx.Context().Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err()) + case res := <-resCh: + r := res.GetCheckTx() + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + Codespace: r.Codespace, + Hash: tx.Hash(), + }, nil + } } // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. @@ -80,54 +89,61 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) err = env.Mempool.CheckTx(tx, func(res *abci.Response) { - checkTxResCh <- res + select { + case <-ctx.Context().Done(): + case checkTxResCh <- res: + } }, mempl.TxInfo{}) if err != nil { env.Logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) } - checkTxResMsg := <-checkTxResCh - checkTxRes := checkTxResMsg.GetCheckTx() - if checkTxRes.Code != abci.CodeTypeOK { - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: abci.ResponseDeliverTx{}, - Hash: tx.Hash(), - }, nil - } - - // Wait for the tx to be included in a block or timeout. select { - case msg := <-deliverTxSub.Out(): // The tx was included in a block. - deliverTxRes := msg.Data().(types.EventDataTx) - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: deliverTxRes.Result, - Hash: tx.Hash(), - Height: deliverTxRes.Height, - }, nil - case <-deliverTxSub.Cancelled(): - var reason string - if deliverTxSub.Err() == nil { - reason = "Tendermint exited" - } else { - reason = deliverTxSub.Err().Error() + case <-ctx.Context().Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err()) + case checkTxResMsg := <-checkTxResCh: + checkTxRes := checkTxResMsg.GetCheckTx() + if checkTxRes.Code != abci.CodeTypeOK { + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, nil + } + + // Wait for the tx to be included in a block or timeout. + select { + case msg := <-deliverTxSub.Out(): // The tx was included in a block. + deliverTxRes := msg.Data().(types.EventDataTx) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: deliverTxRes.Result, + Hash: tx.Hash(), + Height: deliverTxRes.Height, + }, nil + case <-deliverTxSub.Cancelled(): + var reason string + if deliverTxSub.Err() == nil { + reason = "Tendermint exited" + } else { + reason = deliverTxSub.Err().Error() + } + err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) + env.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err + case <-time.After(env.Config.TimeoutBroadcastTxCommit): + err = errors.New("timed out waiting for tx to be included in a block") + env.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err } - err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) - env.Logger.Error("Error on broadcastTxCommit", "err", err) - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: abci.ResponseDeliverTx{}, - Hash: tx.Hash(), - }, err - case <-time.After(env.Config.TimeoutBroadcastTxCommit): - err = errors.New("timed out waiting for tx to be included in a block") - env.Logger.Error("Error on broadcastTxCommit", "err", err) - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: abci.ResponseDeliverTx{}, - Hash: tx.Hash(), - }, err } }