mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-05 04:55:18 +00:00
rpc: avoid leaking threads (#8329)
This commit is contained in:
@@ -34,20 +34,29 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca
|
||||
func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
|
||||
resCh := make(chan *abci.Response, 1)
|
||||
err := env.Mempool.CheckTx(tx, func(res *abci.Response) {
|
||||
resCh <- res
|
||||
select {
|
||||
case <-ctx.Context().Done():
|
||||
case resCh <- res:
|
||||
}
|
||||
|
||||
}, mempl.TxInfo{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := <-resCh
|
||||
r := res.GetCheckTx()
|
||||
return &ctypes.ResultBroadcastTx{
|
||||
Code: r.Code,
|
||||
Data: r.Data,
|
||||
Log: r.Log,
|
||||
Codespace: r.Codespace,
|
||||
Hash: tx.Hash(),
|
||||
}, nil
|
||||
|
||||
select {
|
||||
case <-ctx.Context().Done():
|
||||
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
|
||||
case res := <-resCh:
|
||||
r := res.GetCheckTx()
|
||||
return &ctypes.ResultBroadcastTx{
|
||||
Code: r.Code,
|
||||
Data: r.Data,
|
||||
Log: r.Log,
|
||||
Codespace: r.Codespace,
|
||||
Hash: tx.Hash(),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
|
||||
@@ -80,54 +89,61 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
|
||||
// Broadcast tx and wait for CheckTx result
|
||||
checkTxResCh := make(chan *abci.Response, 1)
|
||||
err = env.Mempool.CheckTx(tx, func(res *abci.Response) {
|
||||
checkTxResCh <- res
|
||||
select {
|
||||
case <-ctx.Context().Done():
|
||||
case checkTxResCh <- res:
|
||||
}
|
||||
}, mempl.TxInfo{})
|
||||
if err != nil {
|
||||
env.Logger.Error("Error on broadcastTxCommit", "err", err)
|
||||
return nil, fmt.Errorf("error on broadcastTxCommit: %v", err)
|
||||
}
|
||||
checkTxResMsg := <-checkTxResCh
|
||||
checkTxRes := checkTxResMsg.GetCheckTx()
|
||||
if checkTxRes.Code != abci.CodeTypeOK {
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: abci.ResponseDeliverTx{},
|
||||
Hash: tx.Hash(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Wait for the tx to be included in a block or timeout.
|
||||
select {
|
||||
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
|
||||
deliverTxRes := msg.Data().(types.EventDataTx)
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: deliverTxRes.Result,
|
||||
Hash: tx.Hash(),
|
||||
Height: deliverTxRes.Height,
|
||||
}, nil
|
||||
case <-deliverTxSub.Cancelled():
|
||||
var reason string
|
||||
if deliverTxSub.Err() == nil {
|
||||
reason = "Tendermint exited"
|
||||
} else {
|
||||
reason = deliverTxSub.Err().Error()
|
||||
case <-ctx.Context().Done():
|
||||
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
|
||||
case checkTxResMsg := <-checkTxResCh:
|
||||
checkTxRes := checkTxResMsg.GetCheckTx()
|
||||
if checkTxRes.Code != abci.CodeTypeOK {
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: abci.ResponseDeliverTx{},
|
||||
Hash: tx.Hash(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Wait for the tx to be included in a block or timeout.
|
||||
select {
|
||||
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
|
||||
deliverTxRes := msg.Data().(types.EventDataTx)
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: deliverTxRes.Result,
|
||||
Hash: tx.Hash(),
|
||||
Height: deliverTxRes.Height,
|
||||
}, nil
|
||||
case <-deliverTxSub.Cancelled():
|
||||
var reason string
|
||||
if deliverTxSub.Err() == nil {
|
||||
reason = "Tendermint exited"
|
||||
} else {
|
||||
reason = deliverTxSub.Err().Error()
|
||||
}
|
||||
err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason)
|
||||
env.Logger.Error("Error on broadcastTxCommit", "err", err)
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: abci.ResponseDeliverTx{},
|
||||
Hash: tx.Hash(),
|
||||
}, err
|
||||
case <-time.After(env.Config.TimeoutBroadcastTxCommit):
|
||||
err = errors.New("timed out waiting for tx to be included in a block")
|
||||
env.Logger.Error("Error on broadcastTxCommit", "err", err)
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: abci.ResponseDeliverTx{},
|
||||
Hash: tx.Hash(),
|
||||
}, err
|
||||
}
|
||||
err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason)
|
||||
env.Logger.Error("Error on broadcastTxCommit", "err", err)
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: abci.ResponseDeliverTx{},
|
||||
Hash: tx.Hash(),
|
||||
}, err
|
||||
case <-time.After(env.Config.TimeoutBroadcastTxCommit):
|
||||
err = errors.New("timed out waiting for tx to be included in a block")
|
||||
env.Logger.Error("Error on broadcastTxCommit", "err", err)
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: abci.ResponseDeliverTx{},
|
||||
Hash: tx.Hash(),
|
||||
}, err
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user