rpc: avoid leaking threads during checktx (backport #8328) (#8333)

This commit is contained in:
mergify[bot]
2022-04-17 09:17:03 -04:00
committed by GitHub
parent 226bc94c5f
commit 04c1f76569

View File

@@ -37,24 +37,32 @@ func (env *Environment) BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*co
err := env.Mempool.CheckTx(
ctx.Context(),
tx,
func(res *abci.Response) { resCh <- res },
func(res *abci.Response) {
select {
case <-ctx.Context().Done():
case resCh <- res:
}
},
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
res := <-resCh
r := res.GetCheckTx()
return &coretypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
MempoolError: r.MempoolError,
Hash: tx.Hash(),
}, nil
select {
case <-ctx.Context().Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
case res := <-resCh:
r := res.GetCheckTx()
return &coretypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
MempoolError: r.MempoolError,
Hash: tx.Hash(),
}, nil
}
}
// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
@@ -64,61 +72,71 @@ func (env *Environment) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*
err := env.Mempool.CheckTx(
ctx.Context(),
tx,
func(res *abci.Response) { resCh <- res },
func(res *abci.Response) {
select {
case <-ctx.Context().Done():
case resCh <- res:
}
},
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
r := (<-resCh).GetCheckTx()
if r.Code != abci.CodeTypeOK {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("transaction encountered error (%s)", r.MempoolError)
}
if !indexer.KVSinkEnabled(env.EventSinks) {
return &coretypes.ResultBroadcastTxCommit{
select {
case <-ctx.Context().Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
case res := <-resCh:
r := res.GetCheckTx()
if r.Code != abci.CodeTypeOK {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
},
errors.New("cannot wait for commit because kvEventSync is not enabled")
}
}, fmt.Errorf("transaction encountered error (%s)", r.MempoolError)
}
startAt := time.Now()
timer := time.NewTimer(0)
defer timer.Stop()
count := 0
for {
count++
select {
case <-ctx.Context().Done():
env.Logger.Error("Error on broadcastTxCommit",
"duration", time.Since(startAt),
"err", err)
if !indexer.KVSinkEnabled(env.EventSinks) {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("timeout waiting for commit of tx %s (%s)",
tx.Hash(), time.Since(startAt))
case <-timer.C:
txres, err := env.Tx(ctx, tx.Hash(), false)
if err != nil {
jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec
backoff := 100 * time.Duration(count) * time.Millisecond
timer.Reset(jitter + backoff)
continue
}
},
errors.New("cannot confirm transaction because kvEventSink is not enabled")
}
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
DeliverTx: txres.TxResult,
Hash: tx.Hash(),
Height: txres.Height,
}, nil
startAt := time.Now()
timer := time.NewTimer(0)
defer timer.Stop()
count := 0
for {
count++
select {
case <-ctx.Context().Done():
env.Logger.Error("error on broadcastTxCommit",
"duration", time.Since(startAt),
"err", err)
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("timeout waiting for commit of tx %s (%s)",
tx.Hash(), time.Since(startAt))
case <-timer.C:
txres, err := env.Tx(ctx, tx.Hash(), false)
if err != nil {
jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec
backoff := 100 * time.Duration(count) * time.Millisecond
timer.Reset(jitter + backoff)
continue
}
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
DeliverTx: txres.TxResult,
Hash: tx.Hash(),
Height: txres.Height,
}, nil
}
}
}
}