Files
tendermint/internal/rpc/core/mempool.go
2022-05-26 08:28:28 -04:00

193 lines
5.9 KiB
Go

package core
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/state/indexer"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/rpc/coretypes"
)
//-----------------------------------------------------------------------------
// NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!)
// BroadcastTxAsync returns right away, with no response. Does not wait for
// CheckTx nor DeliverTx results.
// More:
// https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
// Deprecated and should be removed in 0.37
func (env *Environment) BroadcastTxAsync(ctx context.Context, req *coretypes.RequestBroadcastTx) (*coretypes.ResultBroadcastTx, error) {
go func() { _ = env.Mempool.CheckTx(ctx, req.Tx, nil, mempool.TxInfo{}) }()
return &coretypes.ResultBroadcastTx{Hash: req.Tx.Hash()}, nil
}
// Deprecated and should be remove in 0.37
func (env *Environment) BroadcastTxSync(ctx context.Context, req *coretypes.RequestBroadcastTx) (*coretypes.ResultBroadcastTx, error) {
return env.BroadcastTx(ctx, req)
}
// BroadcastTx returns with the response from CheckTx. Does not wait for
// DeliverTx result.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
func (env *Environment) BroadcastTx(ctx context.Context, req *coretypes.RequestBroadcastTx) (*coretypes.ResultBroadcastTx, error) {
resCh := make(chan *abci.ResponseCheckTx, 1)
err := env.Mempool.CheckTx(
ctx,
req.Tx,
func(res *abci.ResponseCheckTx) {
select {
case <-ctx.Done():
case resCh <- res:
}
},
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Err())
case r := <-resCh:
return &coretypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Codespace: r.Codespace,
Hash: req.Tx.Hash(),
}, nil
}
}
// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit
func (env *Environment) BroadcastTxCommit(ctx context.Context, req *coretypes.RequestBroadcastTx) (*coretypes.ResultBroadcastTxCommit, error) {
resCh := make(chan *abci.ResponseCheckTx, 1)
err := env.Mempool.CheckTx(
ctx,
req.Tx,
func(res *abci.ResponseCheckTx) {
select {
case <-ctx.Done():
case resCh <- res:
}
},
mempool.TxInfo{},
)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Err())
case r := <-resCh:
if r.Code != abci.CodeTypeOK {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: req.Tx.Hash(),
}, fmt.Errorf("wrong ABCI CodeType, got (%d) instead of OK", r.Code)
}
if !indexer.KVSinkEnabled(env.EventSinks) {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: req.Tx.Hash(),
},
errors.New("cannot confirm transaction because kvEventSink is not enabled")
}
startAt := time.Now()
timer := time.NewTimer(0)
defer timer.Stop()
count := 0
for {
count++
select {
case <-ctx.Done():
env.Logger.Error("error on broadcastTxCommit",
"duration", time.Since(startAt),
"err", err)
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: req.Tx.Hash(),
}, fmt.Errorf("timeout waiting for commit of tx %s (%s)",
req.Tx.Hash(), time.Since(startAt))
case <-timer.C:
txres, err := env.Tx(ctx, &coretypes.RequestTx{
Hash: req.Tx.Hash(),
Prove: false,
})
if err != nil {
jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec
backoff := 100 * time.Duration(count) * time.Millisecond
timer.Reset(jitter + backoff)
continue
}
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
TxResult: txres.TxResult,
Hash: req.Tx.Hash(),
Height: txres.Height,
}, nil
}
}
}
}
// UnconfirmedTxs gets unconfirmed transactions from the mempool in order of priority
// More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs
func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.RequestUnconfirmedTxs) (*coretypes.ResultUnconfirmedTxs, error) {
totalCount := env.Mempool.Size()
perPage := env.validatePerPage(req.PerPage.IntPtr())
page, err := validatePage(req.Page.IntPtr(), perPage, totalCount)
if err != nil {
return nil, err
}
skipCount := validateSkipCount(page, perPage)
txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount))
result := txs[skipCount:]
return &coretypes.ResultUnconfirmedTxs{
Count: len(result),
Total: totalCount,
TotalBytes: env.Mempool.SizeBytes(),
Txs: result,
}, nil
}
// NumUnconfirmedTxs gets number of unconfirmed transactions.
// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs
func (env *Environment) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) {
return &coretypes.ResultUnconfirmedTxs{
Count: env.Mempool.Size(),
Total: env.Mempool.Size(),
TotalBytes: env.Mempool.SizeBytes()}, nil
}
// CheckTx checks the transaction without executing it. The transaction won't
// be added to the mempool either.
// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx
func (env *Environment) CheckTx(ctx context.Context, req *coretypes.RequestCheckTx) (*coretypes.ResultCheckTx, error) {
res, err := env.ProxyApp.CheckTx(ctx, &abci.RequestCheckTx{Tx: req.Tx})
if err != nil {
return nil, err
}
return &coretypes.ResultCheckTx{ResponseCheckTx: *res}, nil
}
func (env *Environment) RemoveTx(ctx context.Context, req *coretypes.RequestRemoveTx) error {
return env.Mempool.RemoveTxByKey(req.TxKey)
}