Files
tendermint/rpc/core/mempool.go
Thane Thomson eed27addec docs: Update v0.34.x to prepare for v0.37 (#9244)
* Ignore generated/copied RPC docs

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Sync vuepress config with main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Sync docs package-lock.json with main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Sync docs redirects with main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Sync docs versions with main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update OpenAPI version to v0.34

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Sync DOCS_README with main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update all v0.34.x docs references from master to main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update v0.34 OpenAPI references from master to main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update repo doc links from master to main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update code comment references from master to main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update repo root doc links from master to main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update repo root doc links for docs.tendermint.com from master to main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Build v0.34.x as "latest"

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Explicitly mark v0.34 docs as latest in version selector

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add nav link to main and clearly mark as unstable

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Direct all docs.tendermint.com links to v0.34 on v0.34.x

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update all relevant links on v0.34.x branch to be v0.34-specific

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update changelog refs to docs.tendermint.com

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update remaining GH master link to main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Sync docs build and nav config with main

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Migrate spec links to GitHub repo from docs site

Signed-off-by: Thane Thomson <connect@thanethomson.com>

Signed-off-by: Thane Thomson <connect@thanethomson.com>
2022-08-19 07:20:32 -04:00

184 lines
6.2 KiB
Go

package core
import (
"context"
"errors"
"fmt"
"time"
abci "github.com/tendermint/tendermint/abci/types"
mempl "github.com/tendermint/tendermint/mempool"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
"github.com/tendermint/tendermint/types"
)
//-----------------------------------------------------------------------------
// NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!)
// BroadcastTxAsync returns right away, with no response. Does not wait for
// CheckTx nor DeliverTx results.
// More: https://docs.tendermint.com/v0.34/rpc/#/Tx/broadcast_tx_async
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{})
if err != nil {
return nil, err
}
return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
}
// BroadcastTxSync returns with the response from CheckTx. Does not wait for
// DeliverTx result.
// More: https://docs.tendermint.com/v0.34/rpc/#/Tx/broadcast_tx_sync
func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
resCh := make(chan *abci.Response, 1)
err := env.Mempool.CheckTx(tx, func(res *abci.Response) {
select {
case <-ctx.Context().Done():
case resCh <- res:
}
}, mempl.TxInfo{})
if err != nil {
return nil, err
}
select {
case <-ctx.Context().Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
case res := <-resCh:
r := res.GetCheckTx()
return &ctypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
Hash: tx.Hash(),
}, nil
}
}
// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
// More: https://docs.tendermint.com/v0.34/rpc/#/Tx/broadcast_tx_commit
func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
subscriber := ctx.RemoteAddr()
if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
} else if env.EventBus.NumClientSubscriptions(subscriber) >= env.Config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient)
}
// Subscribe to tx being committed in block.
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel()
q := types.EventQueryTxFor(tx)
deliverTxSub, err := env.EventBus.Subscribe(subCtx, subscriber, q)
if err != nil {
err = fmt.Errorf("failed to subscribe to tx: %w", err)
env.Logger.Error("Error on broadcast_tx_commit", "err", err)
return nil, err
}
defer func() {
if err := env.EventBus.Unsubscribe(context.Background(), subscriber, q); err != nil {
env.Logger.Error("Error unsubscribing from eventBus", "err", err)
}
}()
// Broadcast tx and wait for CheckTx result
checkTxResCh := make(chan *abci.Response, 1)
err = env.Mempool.CheckTx(tx, func(res *abci.Response) {
select {
case <-ctx.Context().Done():
case checkTxResCh <- res:
}
}, mempl.TxInfo{})
if err != nil {
env.Logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("error on broadcastTxCommit: %v", err)
}
select {
case <-ctx.Context().Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
case checkTxResMsg := <-checkTxResCh:
checkTxRes := checkTxResMsg.GetCheckTx()
if checkTxRes.Code != abci.CodeTypeOK {
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, nil
}
// Wait for the tx to be included in a block or timeout.
select {
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
deliverTxRes := msg.Data().(types.EventDataTx)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: deliverTxRes.Result,
Hash: tx.Hash(),
Height: deliverTxRes.Height,
}, nil
case <-deliverTxSub.Cancelled():
var reason string
if deliverTxSub.Err() == nil {
reason = "Tendermint exited"
} else {
reason = deliverTxSub.Err().Error()
}
err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason)
env.Logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, err
case <-time.After(env.Config.TimeoutBroadcastTxCommit):
err = errors.New("timed out waiting for tx to be included in a block")
env.Logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, err
}
}
}
// UnconfirmedTxs gets unconfirmed transactions (maximum ?limit entries)
// including their number.
// More: https://docs.tendermint.com/v0.34/rpc/#/Info/unconfirmed_txs
func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) {
// reuse per_page validator
limit := validatePerPage(limitPtr)
txs := env.Mempool.ReapMaxTxs(limit)
return &ctypes.ResultUnconfirmedTxs{
Count: len(txs),
Total: env.Mempool.Size(),
TotalBytes: env.Mempool.SizeBytes(),
Txs: txs}, nil
}
// NumUnconfirmedTxs gets number of unconfirmed transactions.
// More: https://docs.tendermint.com/v0.34/rpc/#/Info/num_unconfirmed_txs
func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) {
return &ctypes.ResultUnconfirmedTxs{
Count: env.Mempool.Size(),
Total: env.Mempool.Size(),
TotalBytes: env.Mempool.SizeBytes()}, nil
}
// CheckTx checks the transaction without executing it. The transaction won't
// be added to the mempool either.
// More: https://docs.tendermint.com/v0.34/rpc/#/Tx/check_tx
func CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) {
res, err := env.ProxyAppMempool.CheckTxSync(abci.RequestCheckTx{Tx: tx})
if err != nil {
return nil, err
}
return &ctypes.ResultCheckTx{ResponseCheckTx: *res}, nil
}