Files
tendermint/internal/rpc/core/mempool.go
Sam Kleinman 851d2e3bde mempool,rpc: add removetx rpc method (#7047)
Addresses one of the concerns with #7041.

Provides a mechanism (via the RPC interface) to delete a single transaction, described by its hash, from the mempool. The method returns an error if the transaction cannot be found. Once the transaction is removed it remains in the cache and cannot be resubmitted until the cache is cleared or it expires from the cache.
2021-10-05 20:23:15 +00:00

158 lines
4.8 KiB
Go

package core
import (
"errors"
"fmt"
"math/rand"
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/rpc/coretypes"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
"github.com/tendermint/tendermint/types"
)
//-----------------------------------------------------------------------------
// NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!)
// BroadcastTxAsync returns right away, with no response. Does not wait for
// CheckTx nor DeliverTx results.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
func (env *Environment) BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*coretypes.ResultBroadcastTx, error) {
err := env.Mempool.CheckTx(ctx.Context(), tx, nil, mempool.TxInfo{})
if err != nil {
return nil, err
}
return &coretypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
}
// BroadcastTxSync returns with the response from CheckTx. Does not wait for
// DeliverTx result.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
func (env *Environment) BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*coretypes.ResultBroadcastTx, error) {
resCh := make(chan *abci.Response, 1)
err := env.Mempool.CheckTx(
ctx.Context(),
tx,
func(res *abci.Response) { resCh <- res },
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
res := <-resCh
r := res.GetCheckTx()
return &coretypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
MempoolError: r.MempoolError,
Hash: tx.Hash(),
}, nil
}
// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit
func (env *Environment) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*coretypes.ResultBroadcastTxCommit, error) { //nolint:lll
resCh := make(chan *abci.Response, 1)
err := env.Mempool.CheckTx(
ctx.Context(),
tx,
func(res *abci.Response) { resCh <- res },
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
r := (<-resCh).GetCheckTx()
if !indexer.KVSinkEnabled(env.EventSinks) {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
},
errors.New("cannot wait for commit because kvEventSync is not enabled")
}
startAt := time.Now()
timer := time.NewTimer(0)
defer timer.Stop()
count := 0
for {
count++
select {
case <-ctx.Context().Done():
env.Logger.Error("Error on broadcastTxCommit",
"duration", time.Since(startAt),
"err", err)
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("timeout waiting for commit of tx %s (%s)",
tx.Hash(), time.Since(startAt))
case <-timer.C:
txres, err := env.Tx(ctx, tx.Hash(), false)
if err != nil {
jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec
backoff := 100 * time.Duration(count) * time.Millisecond
timer.Reset(jitter + backoff)
continue
}
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
DeliverTx: txres.TxResult,
Hash: tx.Hash(),
Height: txres.Height,
}, nil
}
}
}
// UnconfirmedTxs gets unconfirmed transactions (maximum ?limit entries)
// including their number.
// More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs
func (env *Environment) UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*coretypes.ResultUnconfirmedTxs, error) {
// reuse per_page validator
limit := env.validatePerPage(limitPtr)
txs := env.Mempool.ReapMaxTxs(limit)
return &coretypes.ResultUnconfirmedTxs{
Count: len(txs),
Total: env.Mempool.Size(),
TotalBytes: env.Mempool.SizeBytes(),
Txs: txs}, nil
}
// NumUnconfirmedTxs gets number of unconfirmed transactions.
// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs
func (env *Environment) NumUnconfirmedTxs(ctx *rpctypes.Context) (*coretypes.ResultUnconfirmedTxs, error) {
return &coretypes.ResultUnconfirmedTxs{
Count: env.Mempool.Size(),
Total: env.Mempool.Size(),
TotalBytes: env.Mempool.SizeBytes()}, nil
}
// CheckTx checks the transaction without executing it. The transaction won't
// be added to the mempool either.
// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx
func (env *Environment) CheckTx(ctx *rpctypes.Context, tx types.Tx) (*coretypes.ResultCheckTx, error) {
res, err := env.ProxyAppMempool.CheckTxSync(ctx.Context(), abci.RequestCheckTx{Tx: tx})
if err != nil {
return nil, err
}
return &coretypes.ResultCheckTx{ResponseCheckTx: *res}, nil
}
func (env *Environment) RemoveTx(ctx *rpctypes.Context, txkey types.TxKey) error {
return env.Mempool.RemoveTxByKey(txkey)
}