rpc: avoid leaking threads (#8328)

This commit is contained in:
Sam Kleinman
2022-04-13 14:09:49 -04:00
committed by GitHub
parent 29ad4dcb3b
commit c45367e22c

View File

@@ -38,23 +38,32 @@ func (env *Environment) BroadcastTxSync(ctx context.Context, tx types.Tx) (*core
err := env.Mempool.CheckTx(
ctx,
tx,
func(res *abci.ResponseCheckTx) { resCh <- res },
func(res *abci.ResponseCheckTx) {
select {
case <-ctx.Done():
case resCh <- res:
}
},
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
r := <-resCh
select {
case <-ctx.Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Err())
case r := <-resCh:
return &coretypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
MempoolError: r.MempoolError,
Hash: tx.Hash(),
}, nil
}
return &coretypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
MempoolError: r.MempoolError,
Hash: tx.Hash(),
}, nil
}
// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
@@ -64,61 +73,70 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*co
err := env.Mempool.CheckTx(
ctx,
tx,
func(res *abci.ResponseCheckTx) { resCh <- res },
func(res *abci.ResponseCheckTx) {
select {
case <-ctx.Done():
case resCh <- res:
}
},
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
r := <-resCh
if r.Code != abci.CodeTypeOK {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("transaction encountered error (%s)", r.MempoolError)
}
if !indexer.KVSinkEnabled(env.EventSinks) {
return &coretypes.ResultBroadcastTxCommit{
select {
case <-ctx.Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Err())
case r := <-resCh:
if r.Code != abci.CodeTypeOK {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
},
errors.New("cannot confirm transaction because kvEventSink is not enabled")
}
}, fmt.Errorf("transaction encountered error (%s)", r.MempoolError)
}
startAt := time.Now()
timer := time.NewTimer(0)
defer timer.Stop()
count := 0
for {
count++
select {
case <-ctx.Done():
env.Logger.Error("error on broadcastTxCommit",
"duration", time.Since(startAt),
"err", err)
if !indexer.KVSinkEnabled(env.EventSinks) {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("timeout waiting for commit of tx %s (%s)",
tx.Hash(), time.Since(startAt))
case <-timer.C:
txres, err := env.Tx(ctx, tx.Hash(), false)
if err != nil {
jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec
backoff := 100 * time.Duration(count) * time.Millisecond
timer.Reset(jitter + backoff)
continue
}
},
errors.New("cannot confirm transaction because kvEventSink is not enabled")
}
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
TxResult: txres.TxResult,
Hash: tx.Hash(),
Height: txres.Height,
}, nil
startAt := time.Now()
timer := time.NewTimer(0)
defer timer.Stop()
count := 0
for {
count++
select {
case <-ctx.Done():
env.Logger.Error("error on broadcastTxCommit",
"duration", time.Since(startAt),
"err", err)
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("timeout waiting for commit of tx %s (%s)",
tx.Hash(), time.Since(startAt))
case <-timer.C:
txres, err := env.Tx(ctx, tx.Hash(), false)
if err != nil {
jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec
backoff := 100 * time.Duration(count) * time.Millisecond
timer.Reset(jitter + backoff)
continue
}
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
TxResult: txres.TxResult,
Hash: tx.Hash(),
Height: txres.Height,
}, nil
}
}
}
}