mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-10 23:10:59 +00:00
move Mempool interface into mempool package
Refs #2659 Breaking changes in the mempool package: - Mempool now an interface - old Mempool renamed to CListMempool Breaking changes to state package: - MockMempool moved to mempool/mock package and renamed to Mempool - Mempool interface moved to mempool package
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
memplmock "github.com/tendermint/tendermint/mempool/mock"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
@@ -92,7 +93,7 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
|
||||
// pool.height is determined from the store.
|
||||
fastSync := true
|
||||
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(),
|
||||
sm.MockMempool{}, sm.MockEvidencePool{})
|
||||
memplmock.Mempool{}, sm.MockEvidencePool{})
|
||||
|
||||
// let's add some blocks in
|
||||
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
|
||||
|
||||
@@ -268,7 +268,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
|
||||
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
|
||||
|
||||
// Make Mempool
|
||||
mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0)
|
||||
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
|
||||
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
|
||||
if thisConfig.Consensus.WaitForTxs() {
|
||||
mempool.EnableTxsAvailable()
|
||||
|
||||
@@ -11,13 +11,13 @@ import (
|
||||
|
||||
"github.com/tendermint/tendermint/abci/example/code"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// for testing
|
||||
func assertMempool(txn txNotifier) sm.Mempool {
|
||||
return txn.(sm.Mempool)
|
||||
func assertMempool(txn txNotifier) mempl.Mempool {
|
||||
return txn.(mempl.Mempool)
|
||||
}
|
||||
|
||||
func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
|
||||
|
||||
@@ -136,7 +136,7 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
|
||||
|
||||
// Make Mempool
|
||||
mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0)
|
||||
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
|
||||
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
|
||||
if thisConfig.Consensus.WaitForTxs() {
|
||||
mempool.EnableTxsAvailable()
|
||||
|
||||
@@ -16,11 +16,11 @@ import (
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tendermint/version"
|
||||
memplmock "github.com/tendermint/tendermint/mempool/mock"
|
||||
)
|
||||
|
||||
var crc32c = crc32.MakeTable(crc32.Castagnoli)
|
||||
@@ -442,7 +442,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
|
||||
block := h.store.LoadBlock(height)
|
||||
meta := h.store.LoadBlockMeta(height)
|
||||
|
||||
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{})
|
||||
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, memplmock.Mempool{}, sm.MockEvidencePool{})
|
||||
blockExec.SetEventBus(h.eventBus)
|
||||
|
||||
var err error
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
memplmock "github.com/tendermint/tendermint/mempool/mock"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -312,7 +313,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
|
||||
cmn.Exit(fmt.Sprintf("Error on handshake: %v", err))
|
||||
}
|
||||
|
||||
mempool, evpool := sm.MockMempool{}, sm.MockEvidencePool{}
|
||||
mempool, evpool := memplmock.Mempool{}, sm.MockEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
|
||||
|
||||
consensusState := NewConsensusState(csConfig, state.Copy(), blockExec,
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
memplmock "github.com/tendermint/tendermint/mempool/mock"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
@@ -267,7 +268,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
mempool = sm.MockMempool{}
|
||||
mempool = memplmock.Mempool{}
|
||||
evpool = sm.MockEvidencePool{}
|
||||
)
|
||||
|
||||
|
||||
@@ -10,12 +10,14 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
memplmock "github.com/tendermint/tendermint/mempool/mock"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
@@ -66,7 +68,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
|
||||
return errors.Wrap(err, "failed to start event bus")
|
||||
}
|
||||
defer eventBus.Stop()
|
||||
mempool := sm.MockMempool{}
|
||||
mempool := memplmock.Mempool{}
|
||||
evpool := sm.MockEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
|
||||
consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
|
||||
|
||||
742
mempool/clist_mempool.go
Normal file
742
mempool/clist_mempool.go
Normal file
@@ -0,0 +1,742 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
auto "github.com/tendermint/tendermint/libs/autofile"
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
// CListMempool is an ordered in-memory pool for transactions before they are
|
||||
// proposed in a consensus round. Transaction validity is checked using the
|
||||
// CheckTx abci message before the transaction is added to the pool. The
|
||||
// mempool uses a concurrent list structure for storing transactions that can
|
||||
// be efficiently accessed by multiple concurrent readers.
|
||||
type CListMempool struct {
|
||||
config *cfg.MempoolConfig
|
||||
|
||||
proxyMtx sync.Mutex
|
||||
proxyAppConn proxy.AppConnMempool
|
||||
txs *clist.CList // concurrent linked-list of good txs
|
||||
preCheck PreCheckFunc
|
||||
postCheck PostCheckFunc
|
||||
|
||||
// Track whether we're rechecking txs.
|
||||
// These are not protected by a mutex and are expected to be mutated
|
||||
// in serial (ie. by abci responses which are called in serial).
|
||||
recheckCursor *clist.CElement // next expected response
|
||||
recheckEnd *clist.CElement // re-checking stops here
|
||||
|
||||
// notify listeners (ie. consensus) when txs are available
|
||||
notifiedTxsAvailable bool
|
||||
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
|
||||
|
||||
// Map for quick access to txs to record sender in CheckTx.
|
||||
// txsMap: txKey -> CElement
|
||||
txsMap sync.Map
|
||||
|
||||
// Atomic integers
|
||||
height int64 // the last block Update()'d to
|
||||
rechecking int32 // for re-checking filtered txs on Update()
|
||||
txsBytes int64 // total size of mempool, in bytes
|
||||
|
||||
// Keep a cache of already-seen txs.
|
||||
// This reduces the pressure on the proxyApp.
|
||||
cache txCache
|
||||
|
||||
// A log of mempool txs
|
||||
wal *auto.AutoFile
|
||||
|
||||
logger log.Logger
|
||||
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// Option sets an optional parameter on the mempool.
|
||||
type Option func(*CListMempool)
|
||||
|
||||
// NewCListMempool returns a new mempool with the given configuration and connection to an application.
|
||||
func NewCListMempool(
|
||||
config *cfg.MempoolConfig,
|
||||
proxyAppConn proxy.AppConnMempool,
|
||||
height int64,
|
||||
options ...Option,
|
||||
) *CListMempool {
|
||||
mempool := &CListMempool{
|
||||
config: config,
|
||||
proxyAppConn: proxyAppConn,
|
||||
txs: clist.New(),
|
||||
height: height,
|
||||
rechecking: 0,
|
||||
recheckCursor: nil,
|
||||
recheckEnd: nil,
|
||||
logger: log.NewNopLogger(),
|
||||
metrics: NopMetrics(),
|
||||
}
|
||||
if config.CacheSize > 0 {
|
||||
mempool.cache = newMapTxCache(config.CacheSize)
|
||||
} else {
|
||||
mempool.cache = nopTxCache{}
|
||||
}
|
||||
proxyAppConn.SetResponseCallback(mempool.globalCb)
|
||||
for _, option := range options {
|
||||
option(mempool)
|
||||
}
|
||||
return mempool
|
||||
}
|
||||
|
||||
// EnableTxsAvailable initializes the TxsAvailable channel,
|
||||
// ensuring it will trigger once every height when transactions are available.
|
||||
// NOTE: not thread safe - should only be called once, on startup
|
||||
func (mem *CListMempool) EnableTxsAvailable() {
|
||||
mem.txsAvailable = make(chan struct{}, 1)
|
||||
}
|
||||
|
||||
// SetLogger sets the Logger.
|
||||
func (mem *CListMempool) SetLogger(l log.Logger) {
|
||||
mem.logger = l
|
||||
}
|
||||
|
||||
// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns
|
||||
// false. This is ran before CheckTx.
|
||||
func WithPreCheck(f PreCheckFunc) Option {
|
||||
return func(mem *CListMempool) { mem.preCheck = f }
|
||||
}
|
||||
|
||||
// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
|
||||
// false. This is ran after CheckTx.
|
||||
func WithPostCheck(f PostCheckFunc) Option {
|
||||
return func(mem *CListMempool) { mem.postCheck = f }
|
||||
}
|
||||
|
||||
// WithMetrics sets the metrics.
|
||||
func WithMetrics(metrics *Metrics) Option {
|
||||
return func(mem *CListMempool) { mem.metrics = metrics }
|
||||
}
|
||||
|
||||
// InitWAL creates a directory for the WAL file and opens a file itself.
|
||||
//
|
||||
// *panics* if can't create directory or open file.
|
||||
// *not thread safe*
|
||||
func (mem *CListMempool) InitWAL() {
|
||||
walDir := mem.config.WalDir()
|
||||
err := cmn.EnsureDir(walDir, 0700)
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "Error ensuring WAL dir"))
|
||||
}
|
||||
af, err := auto.OpenAutoFile(walDir + "/wal")
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "Error opening WAL file"))
|
||||
}
|
||||
mem.wal = af
|
||||
}
|
||||
|
||||
// CloseWAL closes and discards the underlying WAL file.
|
||||
// Any further writes will not be relayed to disk.
|
||||
func (mem *CListMempool) CloseWAL() {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
if err := mem.wal.Close(); err != nil {
|
||||
mem.logger.Error("Error closing WAL", "err", err)
|
||||
}
|
||||
mem.wal = nil
|
||||
}
|
||||
|
||||
// Lock locks the mempool. The consensus must be able to hold lock to safely update.
|
||||
func (mem *CListMempool) Lock() {
|
||||
mem.proxyMtx.Lock()
|
||||
}
|
||||
|
||||
// Unlock unlocks the mempool.
|
||||
func (mem *CListMempool) Unlock() {
|
||||
mem.proxyMtx.Unlock()
|
||||
}
|
||||
|
||||
// Size returns the number of transactions in the mempool.
|
||||
func (mem *CListMempool) Size() int {
|
||||
return mem.txs.Len()
|
||||
}
|
||||
|
||||
// TxsBytes returns the total size of all txs in the mempool.
|
||||
func (mem *CListMempool) TxsBytes() int64 {
|
||||
return atomic.LoadInt64(&mem.txsBytes)
|
||||
}
|
||||
|
||||
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
|
||||
// done. E.g. from CheckTx.
|
||||
func (mem *CListMempool) FlushAppConn() error {
|
||||
return mem.proxyAppConn.FlushSync()
|
||||
}
|
||||
|
||||
// Flush removes all transactions from the mempool and cache
|
||||
func (mem *CListMempool) Flush() {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
mem.cache.Reset()
|
||||
|
||||
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
||||
mem.txs.Remove(e)
|
||||
e.DetachPrev()
|
||||
}
|
||||
|
||||
mem.txsMap = sync.Map{}
|
||||
_ = atomic.SwapInt64(&mem.txsBytes, 0)
|
||||
}
|
||||
|
||||
// TxsFront returns the first transaction in the ordered list for peer
|
||||
// goroutines to call .NextWait() on.
|
||||
func (mem *CListMempool) TxsFront() *clist.CElement {
|
||||
return mem.txs.Front()
|
||||
}
|
||||
|
||||
// TxsWaitChan returns a channel to wait on transactions. It will be closed
|
||||
// once the mempool is not empty (ie. the internal `mem.txs` has at least one
|
||||
// element)
|
||||
func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
|
||||
return mem.txs.WaitChan()
|
||||
}
|
||||
|
||||
// CheckTx executes a new transaction against the application to determine its validity
|
||||
// and whether it should be added to the mempool.
|
||||
// It blocks if we're waiting on Update() or Reap().
|
||||
// cb: A callback from the CheckTx command.
|
||||
// It gets called from another goroutine.
|
||||
// CONTRACT: Either cb will get called, or err returned.
|
||||
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
|
||||
return mem.CheckTxWithInfo(tx, cb, TxInfo{PeerID: UnknownPeerID})
|
||||
}
|
||||
|
||||
// CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx.
|
||||
// Currently this metadata is the peer who sent it,
|
||||
// used to prevent the tx from being gossiped back to them.
|
||||
func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
|
||||
mem.proxyMtx.Lock()
|
||||
// use defer to unlock mutex because application (*local client*) might panic
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
var (
|
||||
memSize = mem.Size()
|
||||
txsBytes = mem.TxsBytes()
|
||||
)
|
||||
if memSize >= mem.config.Size ||
|
||||
int64(len(tx))+txsBytes > mem.config.MaxTxsBytes {
|
||||
return ErrMempoolIsFull{
|
||||
memSize, mem.config.Size,
|
||||
txsBytes, mem.config.MaxTxsBytes}
|
||||
}
|
||||
|
||||
// The size of the corresponding amino-encoded TxMessage
|
||||
// can't be larger than the maxMsgSize, otherwise we can't
|
||||
// relay it to peers.
|
||||
if len(tx) > maxTxSize {
|
||||
return ErrTxTooLarge
|
||||
}
|
||||
|
||||
if mem.preCheck != nil {
|
||||
if err := mem.preCheck(tx); err != nil {
|
||||
return ErrPreCheck{err}
|
||||
}
|
||||
}
|
||||
|
||||
// CACHE
|
||||
if !mem.cache.Push(tx) {
|
||||
// Record a new sender for a tx we've already seen.
|
||||
// Note it's possible a tx is still in the cache but no longer in the mempool
|
||||
// (eg. after committing a block, txs are removed from mempool but not cache),
|
||||
// so we only record the sender for txs still in the mempool.
|
||||
if e, ok := mem.txsMap.Load(txKey(tx)); ok {
|
||||
memTx := e.(*clist.CElement).Value.(*mempoolTx)
|
||||
if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
|
||||
// TODO: consider punishing peer for dups,
|
||||
// its non-trivial since invalid txs can become valid,
|
||||
// but they can spam the same tx with little cost to them atm.
|
||||
}
|
||||
}
|
||||
|
||||
return ErrTxInCache
|
||||
}
|
||||
// END CACHE
|
||||
|
||||
// WAL
|
||||
if mem.wal != nil {
|
||||
// TODO: Notify administrators when WAL fails
|
||||
_, err := mem.wal.Write([]byte(tx))
|
||||
if err != nil {
|
||||
mem.logger.Error("Error writing to WAL", "err", err)
|
||||
}
|
||||
_, err = mem.wal.Write([]byte("\n"))
|
||||
if err != nil {
|
||||
mem.logger.Error("Error writing to WAL", "err", err)
|
||||
}
|
||||
}
|
||||
// END WAL
|
||||
|
||||
// NOTE: proxyAppConn may error if tx buffer is full
|
||||
if err = mem.proxyAppConn.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reqRes := mem.proxyAppConn.CheckTxAsync(tx)
|
||||
reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID, cb))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Global callback that will be called after every ABCI response.
|
||||
// Having a single global callback avoids needing to set a callback for each request.
|
||||
// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who),
|
||||
// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
|
||||
// include this information. If we're not in the midst of a recheck, this function will just return,
|
||||
// so the request specific callback can do the work.
|
||||
// When rechecking, we don't need the peerID, so the recheck callback happens here.
|
||||
func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
|
||||
if mem.recheckCursor == nil {
|
||||
return
|
||||
}
|
||||
|
||||
mem.metrics.RecheckTimes.Add(1)
|
||||
mem.resCbRecheck(req, res)
|
||||
|
||||
// update metrics
|
||||
mem.metrics.Size.Set(float64(mem.Size()))
|
||||
}
|
||||
|
||||
// Request specific callback that should be set on individual reqRes objects
|
||||
// to incorporate local information when processing the response.
|
||||
// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
|
||||
// NOTE: alternatively, we could include this information in the ABCI request itself.
|
||||
//
|
||||
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
|
||||
// when all other response processing is complete.
|
||||
//
|
||||
// Used in CheckTxWithInfo to record PeerID who sent us the tx.
|
||||
func (mem *CListMempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) {
|
||||
return func(res *abci.Response) {
|
||||
if mem.recheckCursor != nil {
|
||||
// this should never happen
|
||||
panic("recheck cursor is not nil in reqResCb")
|
||||
}
|
||||
|
||||
mem.resCbFirstTime(tx, peerID, res)
|
||||
|
||||
// update metrics
|
||||
mem.metrics.Size.Set(float64(mem.Size()))
|
||||
|
||||
// passed in by the caller of CheckTx, eg. the RPC
|
||||
if externalCb != nil {
|
||||
externalCb(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Called from:
|
||||
// - resCbFirstTime (lock not held) if tx is valid
|
||||
func (mem *CListMempool) addTx(memTx *mempoolTx) {
|
||||
e := mem.txs.PushBack(memTx)
|
||||
mem.txsMap.Store(txKey(memTx.tx), e)
|
||||
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
|
||||
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
|
||||
}
|
||||
|
||||
// Called from:
|
||||
// - Update (lock held) if tx was committed
|
||||
// - resCbRecheck (lock not held) if tx was invalidated
|
||||
func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
|
||||
mem.txs.Remove(elem)
|
||||
elem.DetachPrev()
|
||||
mem.txsMap.Delete(txKey(tx))
|
||||
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
|
||||
|
||||
if removeFromCache {
|
||||
mem.cache.Remove(tx)
|
||||
}
|
||||
}
|
||||
|
||||
// callback, which is called after the app checked the tx for the first time.
|
||||
//
|
||||
// The case where the app checks the tx for the second and subsequent times is
|
||||
// handled by the resCbRecheck callback.
|
||||
func (mem *CListMempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Response) {
|
||||
switch r := res.Value.(type) {
|
||||
case *abci.Response_CheckTx:
|
||||
var postCheckErr error
|
||||
if mem.postCheck != nil {
|
||||
postCheckErr = mem.postCheck(tx, r.CheckTx)
|
||||
}
|
||||
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
|
||||
memTx := &mempoolTx{
|
||||
height: mem.height,
|
||||
gasWanted: r.CheckTx.GasWanted,
|
||||
tx: tx,
|
||||
}
|
||||
memTx.senders.Store(peerID, true)
|
||||
mem.addTx(memTx)
|
||||
mem.logger.Info("Added good transaction",
|
||||
"tx", txID(tx),
|
||||
"res", r,
|
||||
"height", memTx.height,
|
||||
"total", mem.Size(),
|
||||
)
|
||||
mem.notifyTxsAvailable()
|
||||
} else {
|
||||
// ignore bad transaction
|
||||
mem.logger.Info("Rejected bad transaction", "tx", txID(tx), "res", r, "err", postCheckErr)
|
||||
mem.metrics.FailedTxs.Add(1)
|
||||
// remove from cache (it might be good later)
|
||||
mem.cache.Remove(tx)
|
||||
}
|
||||
default:
|
||||
// ignore other messages
|
||||
}
|
||||
}
|
||||
|
||||
// callback, which is called after the app rechecked the tx.
|
||||
//
|
||||
// The case where the app checks the tx for the first time is handled by the
|
||||
// resCbFirstTime callback.
|
||||
func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
||||
switch r := res.Value.(type) {
|
||||
case *abci.Response_CheckTx:
|
||||
tx := req.GetCheckTx().Tx
|
||||
memTx := mem.recheckCursor.Value.(*mempoolTx)
|
||||
if !bytes.Equal(tx, memTx.tx) {
|
||||
panic(fmt.Sprintf(
|
||||
"Unexpected tx response from proxy during recheck\nExpected %X, got %X",
|
||||
memTx.tx,
|
||||
tx))
|
||||
}
|
||||
var postCheckErr error
|
||||
if mem.postCheck != nil {
|
||||
postCheckErr = mem.postCheck(tx, r.CheckTx)
|
||||
}
|
||||
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
|
||||
// Good, nothing to do.
|
||||
} else {
|
||||
// Tx became invalidated due to newly committed block.
|
||||
mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr)
|
||||
// NOTE: we remove tx from the cache because it might be good later
|
||||
mem.removeTx(tx, mem.recheckCursor, true)
|
||||
}
|
||||
if mem.recheckCursor == mem.recheckEnd {
|
||||
mem.recheckCursor = nil
|
||||
} else {
|
||||
mem.recheckCursor = mem.recheckCursor.Next()
|
||||
}
|
||||
if mem.recheckCursor == nil {
|
||||
// Done!
|
||||
atomic.StoreInt32(&mem.rechecking, 0)
|
||||
mem.logger.Info("Done rechecking txs")
|
||||
|
||||
// incase the recheck removed all txs
|
||||
if mem.Size() > 0 {
|
||||
mem.notifyTxsAvailable()
|
||||
}
|
||||
}
|
||||
default:
|
||||
// ignore other messages
|
||||
}
|
||||
}
|
||||
|
||||
// TxsAvailable returns a channel which fires once for every height,
|
||||
// and only when transactions are available in the mempool.
|
||||
// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
|
||||
func (mem *CListMempool) TxsAvailable() <-chan struct{} {
|
||||
return mem.txsAvailable
|
||||
}
|
||||
|
||||
func (mem *CListMempool) notifyTxsAvailable() {
|
||||
if mem.Size() == 0 {
|
||||
panic("notified txs available but mempool is empty!")
|
||||
}
|
||||
if mem.txsAvailable != nil && !mem.notifiedTxsAvailable {
|
||||
// channel cap is 1, so this will send once
|
||||
mem.notifiedTxsAvailable = true
|
||||
select {
|
||||
case mem.txsAvailable <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes bytes total
|
||||
// with the condition that the total gasWanted must be less than maxGas.
|
||||
// If both maxes are negative, there is no cap on the size of all returned
|
||||
// transactions (~ all available transactions).
|
||||
func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
for atomic.LoadInt32(&mem.rechecking) > 0 {
|
||||
// TODO: Something better?
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
var totalBytes int64
|
||||
var totalGas int64
|
||||
// TODO: we will get a performance boost if we have a good estimate of avg
|
||||
// size per tx, and set the initial capacity based off of that.
|
||||
// txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize))
|
||||
txs := make([]types.Tx, 0, mem.txs.Len())
|
||||
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
||||
memTx := e.Value.(*mempoolTx)
|
||||
// Check total size requirement
|
||||
aminoOverhead := types.ComputeAminoOverhead(memTx.tx, 1)
|
||||
if maxBytes > -1 && totalBytes+int64(len(memTx.tx))+aminoOverhead > maxBytes {
|
||||
return txs
|
||||
}
|
||||
totalBytes += int64(len(memTx.tx)) + aminoOverhead
|
||||
// Check total gas requirement.
|
||||
// If maxGas is negative, skip this check.
|
||||
// Since newTotalGas < masGas, which
|
||||
// must be non-negative, it follows that this won't overflow.
|
||||
newTotalGas := totalGas + memTx.gasWanted
|
||||
if maxGas > -1 && newTotalGas > maxGas {
|
||||
return txs
|
||||
}
|
||||
totalGas = newTotalGas
|
||||
txs = append(txs, memTx.tx)
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
// ReapMaxTxs reaps up to max transactions from the mempool.
|
||||
// If max is negative, there is no cap on the size of all returned
|
||||
// transactions (~ all available transactions).
|
||||
func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
if max < 0 {
|
||||
max = mem.txs.Len()
|
||||
}
|
||||
|
||||
for atomic.LoadInt32(&mem.rechecking) > 0 {
|
||||
// TODO: Something better?
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max))
|
||||
for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
|
||||
memTx := e.Value.(*mempoolTx)
|
||||
txs = append(txs, memTx.tx)
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
// Update informs the mempool that the given txs were committed and can be discarded.
|
||||
// NOTE: this should be called *after* block is committed by consensus.
|
||||
// NOTE: unsafe; Lock/Unlock must be managed by caller
|
||||
func (mem *CListMempool) Update(
|
||||
height int64,
|
||||
txs types.Txs,
|
||||
preCheck PreCheckFunc,
|
||||
postCheck PostCheckFunc,
|
||||
) error {
|
||||
// Set height
|
||||
mem.height = height
|
||||
mem.notifiedTxsAvailable = false
|
||||
|
||||
if preCheck != nil {
|
||||
mem.preCheck = preCheck
|
||||
}
|
||||
if postCheck != nil {
|
||||
mem.postCheck = postCheck
|
||||
}
|
||||
|
||||
// Add committed transactions to cache (if missing).
|
||||
for _, tx := range txs {
|
||||
_ = mem.cache.Push(tx)
|
||||
}
|
||||
|
||||
// Remove committed transactions.
|
||||
txsLeft := mem.removeTxs(txs)
|
||||
|
||||
// Either recheck non-committed txs to see if they became invalid
|
||||
// or just notify there're some txs left.
|
||||
if len(txsLeft) > 0 {
|
||||
if mem.config.Recheck {
|
||||
mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height)
|
||||
mem.recheckTxs(txsLeft)
|
||||
// At this point, mem.txs are being rechecked.
|
||||
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
|
||||
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
|
||||
} else {
|
||||
mem.notifyTxsAvailable()
|
||||
}
|
||||
}
|
||||
|
||||
// Update metrics
|
||||
mem.metrics.Size.Set(float64(mem.Size()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mem *CListMempool) removeTxs(txs types.Txs) []types.Tx {
|
||||
// Build a map for faster lookups.
|
||||
txsMap := make(map[string]struct{}, len(txs))
|
||||
for _, tx := range txs {
|
||||
txsMap[string(tx)] = struct{}{}
|
||||
}
|
||||
|
||||
txsLeft := make([]types.Tx, 0, mem.txs.Len())
|
||||
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
||||
memTx := e.Value.(*mempoolTx)
|
||||
// Remove the tx if it's already in a block.
|
||||
if _, ok := txsMap[string(memTx.tx)]; ok {
|
||||
// NOTE: we don't remove committed txs from the cache.
|
||||
mem.removeTx(memTx.tx, e, false)
|
||||
|
||||
continue
|
||||
}
|
||||
txsLeft = append(txsLeft, memTx.tx)
|
||||
}
|
||||
return txsLeft
|
||||
}
|
||||
|
||||
// NOTE: pass in txs because mem.txs can mutate concurrently.
|
||||
func (mem *CListMempool) recheckTxs(txs []types.Tx) {
|
||||
if len(txs) == 0 {
|
||||
return
|
||||
}
|
||||
atomic.StoreInt32(&mem.rechecking, 1)
|
||||
mem.recheckCursor = mem.txs.Front()
|
||||
mem.recheckEnd = mem.txs.Back()
|
||||
|
||||
// Push txs to proxyAppConn
|
||||
// NOTE: globalCb may be called concurrently.
|
||||
for _, tx := range txs {
|
||||
mem.proxyAppConn.CheckTxAsync(tx)
|
||||
}
|
||||
mem.proxyAppConn.FlushAsync()
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
// mempoolTx is a transaction that successfully ran
|
||||
type mempoolTx struct {
|
||||
height int64 // height that this tx had been validated in
|
||||
gasWanted int64 // amount of gas this tx states it will require
|
||||
tx types.Tx //
|
||||
|
||||
// ids of peers who've sent us this tx (as a map for quick lookups).
|
||||
// senders: PeerID -> bool
|
||||
senders sync.Map
|
||||
}
|
||||
|
||||
// Height returns the height for this transaction
|
||||
func (memTx *mempoolTx) Height() int64 {
|
||||
return atomic.LoadInt64(&memTx.height)
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
type txCache interface {
|
||||
Reset()
|
||||
Push(tx types.Tx) bool
|
||||
Remove(tx types.Tx)
|
||||
}
|
||||
|
||||
// mapTxCache maintains a LRU cache of transactions. This only stores the hash
|
||||
// of the tx, due to memory concerns.
|
||||
type mapTxCache struct {
|
||||
mtx sync.Mutex
|
||||
size int
|
||||
map_ map[[sha256.Size]byte]*list.Element
|
||||
list *list.List
|
||||
}
|
||||
|
||||
var _ txCache = (*mapTxCache)(nil)
|
||||
|
||||
// newMapTxCache returns a new mapTxCache.
|
||||
func newMapTxCache(cacheSize int) *mapTxCache {
|
||||
return &mapTxCache{
|
||||
size: cacheSize,
|
||||
map_: make(map[[sha256.Size]byte]*list.Element, cacheSize),
|
||||
list: list.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// Reset resets the cache to an empty state.
|
||||
func (cache *mapTxCache) Reset() {
|
||||
cache.mtx.Lock()
|
||||
cache.map_ = make(map[[sha256.Size]byte]*list.Element, cache.size)
|
||||
cache.list.Init()
|
||||
cache.mtx.Unlock()
|
||||
}
|
||||
|
||||
// Push adds the given tx to the cache and returns true. It returns
|
||||
// false if tx is already in the cache.
|
||||
func (cache *mapTxCache) Push(tx types.Tx) bool {
|
||||
cache.mtx.Lock()
|
||||
defer cache.mtx.Unlock()
|
||||
|
||||
// Use the tx hash in the cache
|
||||
txHash := txKey(tx)
|
||||
if moved, exists := cache.map_[txHash]; exists {
|
||||
cache.list.MoveToBack(moved)
|
||||
return false
|
||||
}
|
||||
|
||||
if cache.list.Len() >= cache.size {
|
||||
popped := cache.list.Front()
|
||||
poppedTxHash := popped.Value.([sha256.Size]byte)
|
||||
delete(cache.map_, poppedTxHash)
|
||||
if popped != nil {
|
||||
cache.list.Remove(popped)
|
||||
}
|
||||
}
|
||||
e := cache.list.PushBack(txHash)
|
||||
cache.map_[txHash] = e
|
||||
return true
|
||||
}
|
||||
|
||||
// Remove removes the given tx from the cache.
|
||||
func (cache *mapTxCache) Remove(tx types.Tx) {
|
||||
cache.mtx.Lock()
|
||||
txHash := txKey(tx)
|
||||
popped := cache.map_[txHash]
|
||||
delete(cache.map_, txHash)
|
||||
if popped != nil {
|
||||
cache.list.Remove(popped)
|
||||
}
|
||||
|
||||
cache.mtx.Unlock()
|
||||
}
|
||||
|
||||
type nopTxCache struct{}
|
||||
|
||||
var _ txCache = (*nopTxCache)(nil)
|
||||
|
||||
func (nopTxCache) Reset() {}
|
||||
func (nopTxCache) Push(types.Tx) bool { return true }
|
||||
func (nopTxCache) Remove(types.Tx) {}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
// txKey is the fixed length array sha256 hash used as the key in maps.
|
||||
func txKey(tx types.Tx) [sha256.Size]byte {
|
||||
return sha256.Sum256(tx)
|
||||
}
|
||||
|
||||
// txID is the hex encoded hash of the bytes as a types.Tx.
|
||||
func txID(tx []byte) string {
|
||||
return fmt.Sprintf("%X", types.Tx(tx).Hash())
|
||||
}
|
||||
@@ -32,18 +32,18 @@ import (
|
||||
// test.
|
||||
type cleanupFunc func()
|
||||
|
||||
func newMempoolWithApp(cc proxy.ClientCreator) (*Mempool, cleanupFunc) {
|
||||
func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) {
|
||||
return newMempoolWithAppAndConfig(cc, cfg.ResetTestRoot("mempool_test"))
|
||||
}
|
||||
|
||||
func newMempoolWithAppAndConfig(cc proxy.ClientCreator, config *cfg.Config) (*Mempool, cleanupFunc) {
|
||||
func newMempoolWithAppAndConfig(cc proxy.ClientCreator, config *cfg.Config) (*CListMempool, cleanupFunc) {
|
||||
appConnMem, _ := cc.NewABCIClient()
|
||||
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
|
||||
err := appConnMem.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
mempool := NewMempool(config.Mempool, appConnMem, 0)
|
||||
mempool := NewCListMempool(config.Mempool, appConnMem, 0)
|
||||
mempool.SetLogger(log.TestingLogger())
|
||||
return mempool, func() { os.RemoveAll(config.RootDir) }
|
||||
}
|
||||
@@ -66,7 +66,7 @@ func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
|
||||
}
|
||||
}
|
||||
|
||||
func checkTxs(t *testing.T, mempool *Mempool, count int, peerID uint16) types.Txs {
|
||||
func checkTxs(t *testing.T, mempool Mempool, count int, peerID uint16) types.Txs {
|
||||
txs := make(types.Txs, count)
|
||||
txInfo := TxInfo{PeerID: peerID}
|
||||
for i := 0; i < count; i++ {
|
||||
@@ -348,7 +348,6 @@ func TestMempoolCloseWAL(t *testing.T) {
|
||||
// 1. Create the temporary directory for mempool and WAL testing.
|
||||
rootDir, err := ioutil.TempDir("", "mempool-test")
|
||||
require.Nil(t, err, "expecting successful tmpdir creation")
|
||||
defer os.RemoveAll(rootDir)
|
||||
|
||||
// 2. Ensure that it doesn't contain any elements -- Sanity check
|
||||
m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
|
||||
@@ -356,13 +355,13 @@ func TestMempoolCloseWAL(t *testing.T) {
|
||||
require.Equal(t, 0, len(m1), "no matches yet")
|
||||
|
||||
// 3. Create the mempool
|
||||
wcfg := cfg.DefaultMempoolConfig()
|
||||
wcfg.RootDir = rootDir
|
||||
defer os.RemoveAll(wcfg.RootDir)
|
||||
wcfg := cfg.DefaultConfig()
|
||||
wcfg.Mempool.RootDir = rootDir
|
||||
app := kvstore.NewKVStoreApplication()
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
appConnMem, _ := cc.NewABCIClient()
|
||||
mempool := NewMempool(wcfg, appConnMem, 10)
|
||||
mempool, cleanup := newMempoolWithAppAndConfig(cc, wcfg)
|
||||
defer cleanup()
|
||||
mempool.height = 10
|
||||
mempool.InitWAL()
|
||||
|
||||
// 4. Ensure that the directory contains the WAL file
|
||||
24
mempool/doc.go
Normal file
24
mempool/doc.go
Normal file
@@ -0,0 +1,24 @@
|
||||
// The mempool pushes new txs onto the proxyAppConn.
|
||||
// It gets a stream of (req, res) tuples from the proxy.
|
||||
// The mempool stores good txs in a concurrent linked-list.
|
||||
|
||||
// Multiple concurrent go-routines can traverse this linked-list
|
||||
// safely by calling .NextWait() on each element.
|
||||
|
||||
// So we have several go-routines:
|
||||
// 1. Consensus calling Update() and Reap() synchronously
|
||||
// 2. Many mempool reactor's peer routines calling CheckTx()
|
||||
// 3. Many mempool reactor's peer routines traversing the txs linked list
|
||||
// 4. Another goroutine calling GarbageCollectTxs() periodically
|
||||
|
||||
// To manage these goroutines, there are three methods of locking.
|
||||
// 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe)
|
||||
// 2. Mutations to the linked-list elements are atomic
|
||||
// 3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx
|
||||
|
||||
// Garbage collection of old elements from mempool.txs is handlde via
|
||||
// the DetachPrev() call, which makes old elements not reachable by
|
||||
// peer broadcastTxRoutine() automatically garbage collected.
|
||||
|
||||
// TODO: Better handle abci client errors. (make it automatically handle connection errors)
|
||||
package mempool
|
||||
46
mempool/errors.go
Normal file
46
mempool/errors.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrTxInCache is returned to the client if we saw tx earlier
|
||||
ErrTxInCache = errors.New("Tx already exists in cache")
|
||||
|
||||
// ErrTxTooLarge means the tx is too big to be sent in a message to other peers
|
||||
ErrTxTooLarge = fmt.Errorf("Tx too large. Max size is %d", maxTxSize)
|
||||
)
|
||||
|
||||
// ErrMempoolIsFull means Tendermint & an application can't handle that much load
|
||||
type ErrMempoolIsFull struct {
|
||||
numTxs int
|
||||
maxTxs int
|
||||
|
||||
txsBytes int64
|
||||
maxTxsBytes int64
|
||||
}
|
||||
|
||||
func (e ErrMempoolIsFull) Error() string {
|
||||
return fmt.Sprintf(
|
||||
"mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)",
|
||||
e.numTxs, e.maxTxs,
|
||||
e.txsBytes, e.maxTxsBytes)
|
||||
}
|
||||
|
||||
// ErrPreCheck is returned when tx is too big
|
||||
type ErrPreCheck struct {
|
||||
Reason error
|
||||
}
|
||||
|
||||
func (e ErrPreCheck) Error() string {
|
||||
return e.Reason.Error()
|
||||
}
|
||||
|
||||
// IsPreCheckError returns true if err is due to pre check failure.
|
||||
func IsPreCheckError(err error) bool {
|
||||
_, ok := err.(ErrPreCheck)
|
||||
return ok
|
||||
}
|
||||
@@ -1,26 +1,34 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
auto "github.com/tendermint/tendermint/libs/autofile"
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// Mempool defines the mempool interface.
|
||||
// Updates to the mempool need to be synchronized with committing a block so
|
||||
// apps can reset their transient state on Commit.
|
||||
type Mempool interface {
|
||||
Lock()
|
||||
Unlock()
|
||||
|
||||
Size() int
|
||||
|
||||
CheckTx(types.Tx, func(*abci.Response)) error
|
||||
CheckTxWithInfo(types.Tx, func(*abci.Response), TxInfo) error
|
||||
|
||||
ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
|
||||
|
||||
Update(int64, types.Txs, PreCheckFunc, PostCheckFunc) error
|
||||
Flush()
|
||||
FlushAppConn() error
|
||||
|
||||
TxsAvailable() <-chan struct{}
|
||||
EnableTxsAvailable()
|
||||
}
|
||||
|
||||
// PreCheckFunc is an optional filter executed before CheckTx and rejects
|
||||
// transaction if false is returned. An example would be to ensure that a
|
||||
// transaction doesn't exceeded the block size.
|
||||
@@ -39,72 +47,7 @@ type TxInfo struct {
|
||||
PeerID uint16
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
The mempool pushes new txs onto the proxyAppConn.
|
||||
It gets a stream of (req, res) tuples from the proxy.
|
||||
The mempool stores good txs in a concurrent linked-list.
|
||||
|
||||
Multiple concurrent go-routines can traverse this linked-list
|
||||
safely by calling .NextWait() on each element.
|
||||
|
||||
So we have several go-routines:
|
||||
1. Consensus calling Update() and Reap() synchronously
|
||||
2. Many mempool reactor's peer routines calling CheckTx()
|
||||
3. Many mempool reactor's peer routines traversing the txs linked list
|
||||
4. Another goroutine calling GarbageCollectTxs() periodically
|
||||
|
||||
To manage these goroutines, there are three methods of locking.
|
||||
1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe)
|
||||
2. Mutations to the linked-list elements are atomic
|
||||
3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx
|
||||
|
||||
Garbage collection of old elements from mempool.txs is handlde via
|
||||
the DetachPrev() call, which makes old elements not reachable by
|
||||
peer broadcastTxRoutine() automatically garbage collected.
|
||||
|
||||
TODO: Better handle abci client errors. (make it automatically handle connection errors)
|
||||
|
||||
*/
|
||||
|
||||
var (
|
||||
// ErrTxInCache is returned to the client if we saw tx earlier
|
||||
ErrTxInCache = errors.New("Tx already exists in cache")
|
||||
|
||||
// ErrTxTooLarge means the tx is too big to be sent in a message to other peers
|
||||
ErrTxTooLarge = fmt.Errorf("Tx too large. Max size is %d", maxTxSize)
|
||||
)
|
||||
|
||||
// ErrMempoolIsFull means Tendermint & an application can't handle that much load
|
||||
type ErrMempoolIsFull struct {
|
||||
numTxs int
|
||||
maxTxs int
|
||||
|
||||
txsBytes int64
|
||||
maxTxsBytes int64
|
||||
}
|
||||
|
||||
func (e ErrMempoolIsFull) Error() string {
|
||||
return fmt.Sprintf(
|
||||
"Mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)",
|
||||
e.numTxs, e.maxTxs,
|
||||
e.txsBytes, e.maxTxsBytes)
|
||||
}
|
||||
|
||||
// ErrPreCheck is returned when tx is too big
|
||||
type ErrPreCheck struct {
|
||||
Reason error
|
||||
}
|
||||
|
||||
func (e ErrPreCheck) Error() string {
|
||||
return e.Reason.Error()
|
||||
}
|
||||
|
||||
// IsPreCheckError returns true if err is due to pre check failure.
|
||||
func IsPreCheckError(err error) bool {
|
||||
_, ok := err.(ErrPreCheck)
|
||||
return ok
|
||||
}
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
// PreCheckAminoMaxBytes checks that the size of the transaction plus the amino
|
||||
// overhead is smaller or equal to the expected maxBytes.
|
||||
@@ -143,718 +86,3 @@ func PostCheckMaxGas(maxGas int64) PostCheckFunc {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// TxID is the hex encoded hash of the bytes as a types.Tx.
|
||||
func TxID(tx []byte) string {
|
||||
return fmt.Sprintf("%X", types.Tx(tx).Hash())
|
||||
}
|
||||
|
||||
// txKey is the fixed length array sha256 hash used as the key in maps.
|
||||
func txKey(tx types.Tx) [sha256.Size]byte {
|
||||
return sha256.Sum256(tx)
|
||||
}
|
||||
|
||||
// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus
|
||||
// round. Transaction validity is checked using the CheckTx abci message before the transaction is
|
||||
// added to the pool. The Mempool uses a concurrent list structure for storing transactions that
|
||||
// can be efficiently accessed by multiple concurrent readers.
|
||||
type Mempool struct {
|
||||
config *cfg.MempoolConfig
|
||||
|
||||
proxyMtx sync.Mutex
|
||||
proxyAppConn proxy.AppConnMempool
|
||||
txs *clist.CList // concurrent linked-list of good txs
|
||||
preCheck PreCheckFunc
|
||||
postCheck PostCheckFunc
|
||||
|
||||
// Track whether we're rechecking txs.
|
||||
// These are not protected by a mutex and are expected to be mutated
|
||||
// in serial (ie. by abci responses which are called in serial).
|
||||
recheckCursor *clist.CElement // next expected response
|
||||
recheckEnd *clist.CElement // re-checking stops here
|
||||
|
||||
// notify listeners (ie. consensus) when txs are available
|
||||
notifiedTxsAvailable bool
|
||||
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
|
||||
|
||||
// Map for quick access to txs to record sender in CheckTx.
|
||||
// txsMap: txKey -> CElement
|
||||
txsMap sync.Map
|
||||
|
||||
// Atomic integers
|
||||
height int64 // the last block Update()'d to
|
||||
rechecking int32 // for re-checking filtered txs on Update()
|
||||
txsBytes int64 // total size of mempool, in bytes
|
||||
|
||||
// Keep a cache of already-seen txs.
|
||||
// This reduces the pressure on the proxyApp.
|
||||
cache txCache
|
||||
|
||||
// A log of mempool txs
|
||||
wal *auto.AutoFile
|
||||
|
||||
logger log.Logger
|
||||
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// MempoolOption sets an optional parameter on the Mempool.
|
||||
type MempoolOption func(*Mempool)
|
||||
|
||||
// NewMempool returns a new Mempool with the given configuration and connection to an application.
|
||||
func NewMempool(
|
||||
config *cfg.MempoolConfig,
|
||||
proxyAppConn proxy.AppConnMempool,
|
||||
height int64,
|
||||
options ...MempoolOption,
|
||||
) *Mempool {
|
||||
mempool := &Mempool{
|
||||
config: config,
|
||||
proxyAppConn: proxyAppConn,
|
||||
txs: clist.New(),
|
||||
height: height,
|
||||
rechecking: 0,
|
||||
recheckCursor: nil,
|
||||
recheckEnd: nil,
|
||||
logger: log.NewNopLogger(),
|
||||
metrics: NopMetrics(),
|
||||
}
|
||||
if config.CacheSize > 0 {
|
||||
mempool.cache = newMapTxCache(config.CacheSize)
|
||||
} else {
|
||||
mempool.cache = nopTxCache{}
|
||||
}
|
||||
proxyAppConn.SetResponseCallback(mempool.globalCb)
|
||||
for _, option := range options {
|
||||
option(mempool)
|
||||
}
|
||||
return mempool
|
||||
}
|
||||
|
||||
// EnableTxsAvailable initializes the TxsAvailable channel,
|
||||
// ensuring it will trigger once every height when transactions are available.
|
||||
// NOTE: not thread safe - should only be called once, on startup
|
||||
func (mem *Mempool) EnableTxsAvailable() {
|
||||
mem.txsAvailable = make(chan struct{}, 1)
|
||||
}
|
||||
|
||||
// SetLogger sets the Logger.
|
||||
func (mem *Mempool) SetLogger(l log.Logger) {
|
||||
mem.logger = l
|
||||
}
|
||||
|
||||
// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns
|
||||
// false. This is ran before CheckTx.
|
||||
func WithPreCheck(f PreCheckFunc) MempoolOption {
|
||||
return func(mem *Mempool) { mem.preCheck = f }
|
||||
}
|
||||
|
||||
// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
|
||||
// false. This is ran after CheckTx.
|
||||
func WithPostCheck(f PostCheckFunc) MempoolOption {
|
||||
return func(mem *Mempool) { mem.postCheck = f }
|
||||
}
|
||||
|
||||
// WithMetrics sets the metrics.
|
||||
func WithMetrics(metrics *Metrics) MempoolOption {
|
||||
return func(mem *Mempool) { mem.metrics = metrics }
|
||||
}
|
||||
|
||||
// InitWAL creates a directory for the WAL file and opens a file itself.
|
||||
//
|
||||
// *panics* if can't create directory or open file.
|
||||
// *not thread safe*
|
||||
func (mem *Mempool) InitWAL() {
|
||||
walDir := mem.config.WalDir()
|
||||
err := cmn.EnsureDir(walDir, 0700)
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "Error ensuring Mempool WAL dir"))
|
||||
}
|
||||
af, err := auto.OpenAutoFile(walDir + "/wal")
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "Error opening Mempool WAL file"))
|
||||
}
|
||||
mem.wal = af
|
||||
}
|
||||
|
||||
// CloseWAL closes and discards the underlying WAL file.
|
||||
// Any further writes will not be relayed to disk.
|
||||
func (mem *Mempool) CloseWAL() {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
if err := mem.wal.Close(); err != nil {
|
||||
mem.logger.Error("Error closing WAL", "err", err)
|
||||
}
|
||||
mem.wal = nil
|
||||
}
|
||||
|
||||
// Lock locks the mempool. The consensus must be able to hold lock to safely update.
|
||||
func (mem *Mempool) Lock() {
|
||||
mem.proxyMtx.Lock()
|
||||
}
|
||||
|
||||
// Unlock unlocks the mempool.
|
||||
func (mem *Mempool) Unlock() {
|
||||
mem.proxyMtx.Unlock()
|
||||
}
|
||||
|
||||
// Size returns the number of transactions in the mempool.
|
||||
func (mem *Mempool) Size() int {
|
||||
return mem.txs.Len()
|
||||
}
|
||||
|
||||
// TxsBytes returns the total size of all txs in the mempool.
|
||||
func (mem *Mempool) TxsBytes() int64 {
|
||||
return atomic.LoadInt64(&mem.txsBytes)
|
||||
}
|
||||
|
||||
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
|
||||
// done. E.g. from CheckTx.
|
||||
func (mem *Mempool) FlushAppConn() error {
|
||||
return mem.proxyAppConn.FlushSync()
|
||||
}
|
||||
|
||||
// Flush removes all transactions from the mempool and cache
|
||||
func (mem *Mempool) Flush() {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
mem.cache.Reset()
|
||||
|
||||
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
||||
mem.txs.Remove(e)
|
||||
e.DetachPrev()
|
||||
}
|
||||
|
||||
mem.txsMap = sync.Map{}
|
||||
_ = atomic.SwapInt64(&mem.txsBytes, 0)
|
||||
}
|
||||
|
||||
// TxsFront returns the first transaction in the ordered list for peer
|
||||
// goroutines to call .NextWait() on.
|
||||
func (mem *Mempool) TxsFront() *clist.CElement {
|
||||
return mem.txs.Front()
|
||||
}
|
||||
|
||||
// TxsWaitChan returns a channel to wait on transactions. It will be closed
|
||||
// once the mempool is not empty (ie. the internal `mem.txs` has at least one
|
||||
// element)
|
||||
func (mem *Mempool) TxsWaitChan() <-chan struct{} {
|
||||
return mem.txs.WaitChan()
|
||||
}
|
||||
|
||||
// CheckTx executes a new transaction against the application to determine its validity
|
||||
// and whether it should be added to the mempool.
|
||||
// It blocks if we're waiting on Update() or Reap().
|
||||
// cb: A callback from the CheckTx command.
|
||||
// It gets called from another goroutine.
|
||||
// CONTRACT: Either cb will get called, or err returned.
|
||||
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
|
||||
return mem.CheckTxWithInfo(tx, cb, TxInfo{PeerID: UnknownPeerID})
|
||||
}
|
||||
|
||||
// CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx.
|
||||
// Currently this metadata is the peer who sent it,
|
||||
// used to prevent the tx from being gossiped back to them.
|
||||
func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
|
||||
mem.proxyMtx.Lock()
|
||||
// use defer to unlock mutex because application (*local client*) might panic
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
var (
|
||||
memSize = mem.Size()
|
||||
txsBytes = mem.TxsBytes()
|
||||
)
|
||||
if memSize >= mem.config.Size ||
|
||||
int64(len(tx))+txsBytes > mem.config.MaxTxsBytes {
|
||||
return ErrMempoolIsFull{
|
||||
memSize, mem.config.Size,
|
||||
txsBytes, mem.config.MaxTxsBytes}
|
||||
}
|
||||
|
||||
// The size of the corresponding amino-encoded TxMessage
|
||||
// can't be larger than the maxMsgSize, otherwise we can't
|
||||
// relay it to peers.
|
||||
if len(tx) > maxTxSize {
|
||||
return ErrTxTooLarge
|
||||
}
|
||||
|
||||
if mem.preCheck != nil {
|
||||
if err := mem.preCheck(tx); err != nil {
|
||||
return ErrPreCheck{err}
|
||||
}
|
||||
}
|
||||
|
||||
// CACHE
|
||||
if !mem.cache.Push(tx) {
|
||||
// Record a new sender for a tx we've already seen.
|
||||
// Note it's possible a tx is still in the cache but no longer in the mempool
|
||||
// (eg. after committing a block, txs are removed from mempool but not cache),
|
||||
// so we only record the sender for txs still in the mempool.
|
||||
if e, ok := mem.txsMap.Load(txKey(tx)); ok {
|
||||
memTx := e.(*clist.CElement).Value.(*mempoolTx)
|
||||
if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
|
||||
// TODO: consider punishing peer for dups,
|
||||
// its non-trivial since invalid txs can become valid,
|
||||
// but they can spam the same tx with little cost to them atm.
|
||||
}
|
||||
}
|
||||
|
||||
return ErrTxInCache
|
||||
}
|
||||
// END CACHE
|
||||
|
||||
// WAL
|
||||
if mem.wal != nil {
|
||||
// TODO: Notify administrators when WAL fails
|
||||
_, err := mem.wal.Write([]byte(tx))
|
||||
if err != nil {
|
||||
mem.logger.Error("Error writing to WAL", "err", err)
|
||||
}
|
||||
_, err = mem.wal.Write([]byte("\n"))
|
||||
if err != nil {
|
||||
mem.logger.Error("Error writing to WAL", "err", err)
|
||||
}
|
||||
}
|
||||
// END WAL
|
||||
|
||||
// NOTE: proxyAppConn may error if tx buffer is full
|
||||
if err = mem.proxyAppConn.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reqRes := mem.proxyAppConn.CheckTxAsync(tx)
|
||||
reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID, cb))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Global callback that will be called after every ABCI response.
|
||||
// Having a single global callback avoids needing to set a callback for each request.
|
||||
// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who),
|
||||
// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
|
||||
// include this information. If we're not in the midst of a recheck, this function will just return,
|
||||
// so the request specific callback can do the work.
|
||||
// When rechecking, we don't need the peerID, so the recheck callback happens here.
|
||||
func (mem *Mempool) globalCb(req *abci.Request, res *abci.Response) {
|
||||
if mem.recheckCursor == nil {
|
||||
return
|
||||
}
|
||||
|
||||
mem.metrics.RecheckTimes.Add(1)
|
||||
mem.resCbRecheck(req, res)
|
||||
|
||||
// update metrics
|
||||
mem.metrics.Size.Set(float64(mem.Size()))
|
||||
}
|
||||
|
||||
// Request specific callback that should be set on individual reqRes objects
|
||||
// to incorporate local information when processing the response.
|
||||
// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
|
||||
// NOTE: alternatively, we could include this information in the ABCI request itself.
|
||||
//
|
||||
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
|
||||
// when all other response processing is complete.
|
||||
//
|
||||
// Used in CheckTxWithInfo to record PeerID who sent us the tx.
|
||||
func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) {
|
||||
return func(res *abci.Response) {
|
||||
if mem.recheckCursor != nil {
|
||||
// this should never happen
|
||||
panic("recheck cursor is not nil in reqResCb")
|
||||
}
|
||||
|
||||
mem.resCbFirstTime(tx, peerID, res)
|
||||
|
||||
// update metrics
|
||||
mem.metrics.Size.Set(float64(mem.Size()))
|
||||
|
||||
// passed in by the caller of CheckTx, eg. the RPC
|
||||
if externalCb != nil {
|
||||
externalCb(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Called from:
|
||||
// - resCbFirstTime (lock not held) if tx is valid
|
||||
func (mem *Mempool) addTx(memTx *mempoolTx) {
|
||||
e := mem.txs.PushBack(memTx)
|
||||
mem.txsMap.Store(txKey(memTx.tx), e)
|
||||
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
|
||||
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
|
||||
}
|
||||
|
||||
// Called from:
|
||||
// - Update (lock held) if tx was committed
|
||||
// - resCbRecheck (lock not held) if tx was invalidated
|
||||
func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
|
||||
mem.txs.Remove(elem)
|
||||
elem.DetachPrev()
|
||||
mem.txsMap.Delete(txKey(tx))
|
||||
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
|
||||
|
||||
if removeFromCache {
|
||||
mem.cache.Remove(tx)
|
||||
}
|
||||
}
|
||||
|
||||
// callback, which is called after the app checked the tx for the first time.
|
||||
//
|
||||
// The case where the app checks the tx for the second and subsequent times is
|
||||
// handled by the resCbRecheck callback.
|
||||
func (mem *Mempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Response) {
|
||||
switch r := res.Value.(type) {
|
||||
case *abci.Response_CheckTx:
|
||||
var postCheckErr error
|
||||
if mem.postCheck != nil {
|
||||
postCheckErr = mem.postCheck(tx, r.CheckTx)
|
||||
}
|
||||
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
|
||||
memTx := &mempoolTx{
|
||||
height: mem.height,
|
||||
gasWanted: r.CheckTx.GasWanted,
|
||||
tx: tx,
|
||||
}
|
||||
memTx.senders.Store(peerID, true)
|
||||
mem.addTx(memTx)
|
||||
mem.logger.Info("Added good transaction",
|
||||
"tx", TxID(tx),
|
||||
"res", r,
|
||||
"height", memTx.height,
|
||||
"total", mem.Size(),
|
||||
)
|
||||
mem.notifyTxsAvailable()
|
||||
} else {
|
||||
// ignore bad transaction
|
||||
mem.logger.Info("Rejected bad transaction", "tx", TxID(tx), "res", r, "err", postCheckErr)
|
||||
mem.metrics.FailedTxs.Add(1)
|
||||
// remove from cache (it might be good later)
|
||||
mem.cache.Remove(tx)
|
||||
}
|
||||
default:
|
||||
// ignore other messages
|
||||
}
|
||||
}
|
||||
|
||||
// callback, which is called after the app rechecked the tx.
|
||||
//
|
||||
// The case where the app checks the tx for the first time is handled by the
|
||||
// resCbFirstTime callback.
|
||||
func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
||||
switch r := res.Value.(type) {
|
||||
case *abci.Response_CheckTx:
|
||||
tx := req.GetCheckTx().Tx
|
||||
memTx := mem.recheckCursor.Value.(*mempoolTx)
|
||||
if !bytes.Equal(tx, memTx.tx) {
|
||||
panic(fmt.Sprintf(
|
||||
"Unexpected tx response from proxy during recheck\nExpected %X, got %X",
|
||||
memTx.tx,
|
||||
tx))
|
||||
}
|
||||
var postCheckErr error
|
||||
if mem.postCheck != nil {
|
||||
postCheckErr = mem.postCheck(tx, r.CheckTx)
|
||||
}
|
||||
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
|
||||
// Good, nothing to do.
|
||||
} else {
|
||||
// Tx became invalidated due to newly committed block.
|
||||
mem.logger.Info("Tx is no longer valid", "tx", TxID(tx), "res", r, "err", postCheckErr)
|
||||
// NOTE: we remove tx from the cache because it might be good later
|
||||
mem.removeTx(tx, mem.recheckCursor, true)
|
||||
}
|
||||
if mem.recheckCursor == mem.recheckEnd {
|
||||
mem.recheckCursor = nil
|
||||
} else {
|
||||
mem.recheckCursor = mem.recheckCursor.Next()
|
||||
}
|
||||
if mem.recheckCursor == nil {
|
||||
// Done!
|
||||
atomic.StoreInt32(&mem.rechecking, 0)
|
||||
mem.logger.Info("Done rechecking txs")
|
||||
|
||||
// incase the recheck removed all txs
|
||||
if mem.Size() > 0 {
|
||||
mem.notifyTxsAvailable()
|
||||
}
|
||||
}
|
||||
default:
|
||||
// ignore other messages
|
||||
}
|
||||
}
|
||||
|
||||
// TxsAvailable returns a channel which fires once for every height,
|
||||
// and only when transactions are available in the mempool.
|
||||
// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
|
||||
func (mem *Mempool) TxsAvailable() <-chan struct{} {
|
||||
return mem.txsAvailable
|
||||
}
|
||||
|
||||
func (mem *Mempool) notifyTxsAvailable() {
|
||||
if mem.Size() == 0 {
|
||||
panic("notified txs available but mempool is empty!")
|
||||
}
|
||||
if mem.txsAvailable != nil && !mem.notifiedTxsAvailable {
|
||||
// channel cap is 1, so this will send once
|
||||
mem.notifiedTxsAvailable = true
|
||||
select {
|
||||
case mem.txsAvailable <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes bytes total
|
||||
// with the condition that the total gasWanted must be less than maxGas.
|
||||
// If both maxes are negative, there is no cap on the size of all returned
|
||||
// transactions (~ all available transactions).
|
||||
func (mem *Mempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
for atomic.LoadInt32(&mem.rechecking) > 0 {
|
||||
// TODO: Something better?
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
var totalBytes int64
|
||||
var totalGas int64
|
||||
// TODO: we will get a performance boost if we have a good estimate of avg
|
||||
// size per tx, and set the initial capacity based off of that.
|
||||
// txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize))
|
||||
txs := make([]types.Tx, 0, mem.txs.Len())
|
||||
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
||||
memTx := e.Value.(*mempoolTx)
|
||||
// Check total size requirement
|
||||
aminoOverhead := types.ComputeAminoOverhead(memTx.tx, 1)
|
||||
if maxBytes > -1 && totalBytes+int64(len(memTx.tx))+aminoOverhead > maxBytes {
|
||||
return txs
|
||||
}
|
||||
totalBytes += int64(len(memTx.tx)) + aminoOverhead
|
||||
// Check total gas requirement.
|
||||
// If maxGas is negative, skip this check.
|
||||
// Since newTotalGas < masGas, which
|
||||
// must be non-negative, it follows that this won't overflow.
|
||||
newTotalGas := totalGas + memTx.gasWanted
|
||||
if maxGas > -1 && newTotalGas > maxGas {
|
||||
return txs
|
||||
}
|
||||
totalGas = newTotalGas
|
||||
txs = append(txs, memTx.tx)
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
// ReapMaxTxs reaps up to max transactions from the mempool.
|
||||
// If max is negative, there is no cap on the size of all returned
|
||||
// transactions (~ all available transactions).
|
||||
func (mem *Mempool) ReapMaxTxs(max int) types.Txs {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
if max < 0 {
|
||||
max = mem.txs.Len()
|
||||
}
|
||||
|
||||
for atomic.LoadInt32(&mem.rechecking) > 0 {
|
||||
// TODO: Something better?
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max))
|
||||
for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
|
||||
memTx := e.Value.(*mempoolTx)
|
||||
txs = append(txs, memTx.tx)
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
// Update informs the mempool that the given txs were committed and can be discarded.
|
||||
// NOTE: this should be called *after* block is committed by consensus.
|
||||
// NOTE: unsafe; Lock/Unlock must be managed by caller
|
||||
func (mem *Mempool) Update(
|
||||
height int64,
|
||||
txs types.Txs,
|
||||
preCheck PreCheckFunc,
|
||||
postCheck PostCheckFunc,
|
||||
) error {
|
||||
// Set height
|
||||
mem.height = height
|
||||
mem.notifiedTxsAvailable = false
|
||||
|
||||
if preCheck != nil {
|
||||
mem.preCheck = preCheck
|
||||
}
|
||||
if postCheck != nil {
|
||||
mem.postCheck = postCheck
|
||||
}
|
||||
|
||||
// Add committed transactions to cache (if missing).
|
||||
for _, tx := range txs {
|
||||
_ = mem.cache.Push(tx)
|
||||
}
|
||||
|
||||
// Remove committed transactions.
|
||||
txsLeft := mem.removeTxs(txs)
|
||||
|
||||
// Either recheck non-committed txs to see if they became invalid
|
||||
// or just notify there're some txs left.
|
||||
if len(txsLeft) > 0 {
|
||||
if mem.config.Recheck {
|
||||
mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height)
|
||||
mem.recheckTxs(txsLeft)
|
||||
// At this point, mem.txs are being rechecked.
|
||||
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
|
||||
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
|
||||
} else {
|
||||
mem.notifyTxsAvailable()
|
||||
}
|
||||
}
|
||||
|
||||
// Update metrics
|
||||
mem.metrics.Size.Set(float64(mem.Size()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx {
|
||||
// Build a map for faster lookups.
|
||||
txsMap := make(map[string]struct{}, len(txs))
|
||||
for _, tx := range txs {
|
||||
txsMap[string(tx)] = struct{}{}
|
||||
}
|
||||
|
||||
txsLeft := make([]types.Tx, 0, mem.txs.Len())
|
||||
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
||||
memTx := e.Value.(*mempoolTx)
|
||||
// Remove the tx if it's already in a block.
|
||||
if _, ok := txsMap[string(memTx.tx)]; ok {
|
||||
// NOTE: we don't remove committed txs from the cache.
|
||||
mem.removeTx(memTx.tx, e, false)
|
||||
|
||||
continue
|
||||
}
|
||||
txsLeft = append(txsLeft, memTx.tx)
|
||||
}
|
||||
return txsLeft
|
||||
}
|
||||
|
||||
// NOTE: pass in txs because mem.txs can mutate concurrently.
|
||||
func (mem *Mempool) recheckTxs(txs []types.Tx) {
|
||||
if len(txs) == 0 {
|
||||
return
|
||||
}
|
||||
atomic.StoreInt32(&mem.rechecking, 1)
|
||||
mem.recheckCursor = mem.txs.Front()
|
||||
mem.recheckEnd = mem.txs.Back()
|
||||
|
||||
// Push txs to proxyAppConn
|
||||
// NOTE: globalCb may be called concurrently.
|
||||
for _, tx := range txs {
|
||||
mem.proxyAppConn.CheckTxAsync(tx)
|
||||
}
|
||||
mem.proxyAppConn.FlushAsync()
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
// mempoolTx is a transaction that successfully ran
|
||||
type mempoolTx struct {
|
||||
height int64 // height that this tx had been validated in
|
||||
gasWanted int64 // amount of gas this tx states it will require
|
||||
tx types.Tx //
|
||||
|
||||
// ids of peers who've sent us this tx (as a map for quick lookups).
|
||||
// senders: PeerID -> bool
|
||||
senders sync.Map
|
||||
}
|
||||
|
||||
// Height returns the height for this transaction
|
||||
func (memTx *mempoolTx) Height() int64 {
|
||||
return atomic.LoadInt64(&memTx.height)
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
type txCache interface {
|
||||
Reset()
|
||||
Push(tx types.Tx) bool
|
||||
Remove(tx types.Tx)
|
||||
}
|
||||
|
||||
// mapTxCache maintains a LRU cache of transactions. This only stores the hash
|
||||
// of the tx, due to memory concerns.
|
||||
type mapTxCache struct {
|
||||
mtx sync.Mutex
|
||||
size int
|
||||
map_ map[[sha256.Size]byte]*list.Element
|
||||
list *list.List
|
||||
}
|
||||
|
||||
var _ txCache = (*mapTxCache)(nil)
|
||||
|
||||
// newMapTxCache returns a new mapTxCache.
|
||||
func newMapTxCache(cacheSize int) *mapTxCache {
|
||||
return &mapTxCache{
|
||||
size: cacheSize,
|
||||
map_: make(map[[sha256.Size]byte]*list.Element, cacheSize),
|
||||
list: list.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// Reset resets the cache to an empty state.
|
||||
func (cache *mapTxCache) Reset() {
|
||||
cache.mtx.Lock()
|
||||
cache.map_ = make(map[[sha256.Size]byte]*list.Element, cache.size)
|
||||
cache.list.Init()
|
||||
cache.mtx.Unlock()
|
||||
}
|
||||
|
||||
// Push adds the given tx to the cache and returns true. It returns
|
||||
// false if tx is already in the cache.
|
||||
func (cache *mapTxCache) Push(tx types.Tx) bool {
|
||||
cache.mtx.Lock()
|
||||
defer cache.mtx.Unlock()
|
||||
|
||||
// Use the tx hash in the cache
|
||||
txHash := txKey(tx)
|
||||
if moved, exists := cache.map_[txHash]; exists {
|
||||
cache.list.MoveToBack(moved)
|
||||
return false
|
||||
}
|
||||
|
||||
if cache.list.Len() >= cache.size {
|
||||
popped := cache.list.Front()
|
||||
poppedTxHash := popped.Value.([sha256.Size]byte)
|
||||
delete(cache.map_, poppedTxHash)
|
||||
if popped != nil {
|
||||
cache.list.Remove(popped)
|
||||
}
|
||||
}
|
||||
e := cache.list.PushBack(txHash)
|
||||
cache.map_[txHash] = e
|
||||
return true
|
||||
}
|
||||
|
||||
// Remove removes the given tx from the cache.
|
||||
func (cache *mapTxCache) Remove(tx types.Tx) {
|
||||
cache.mtx.Lock()
|
||||
txHash := txKey(tx)
|
||||
popped := cache.map_[txHash]
|
||||
delete(cache.map_, txHash)
|
||||
if popped != nil {
|
||||
cache.list.Remove(popped)
|
||||
}
|
||||
|
||||
cache.mtx.Unlock()
|
||||
}
|
||||
|
||||
type nopTxCache struct{}
|
||||
|
||||
var _ txCache = (*nopTxCache)(nil)
|
||||
|
||||
func (nopTxCache) Reset() {}
|
||||
func (nopTxCache) Push(types.Tx) bool { return true }
|
||||
func (nopTxCache) Remove(types.Tx) {}
|
||||
|
||||
36
mempool/mock/mempool.go
Normal file
36
mempool/mock/mempool.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// Mempool is an empty implementation of a Mempool, useful for testing.
|
||||
type Mempool struct{}
|
||||
|
||||
var _ mempl.Mempool = Mempool{}
|
||||
|
||||
func (Mempool) Lock() {}
|
||||
func (Mempool) Unlock() {}
|
||||
func (Mempool) Size() int { return 0 }
|
||||
func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error {
|
||||
return nil
|
||||
}
|
||||
func (Mempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response),
|
||||
_ mempl.TxInfo) error {
|
||||
return nil
|
||||
}
|
||||
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
|
||||
func (Mempool) Update(
|
||||
_ int64,
|
||||
_ types.Txs,
|
||||
_ mempl.PreCheckFunc,
|
||||
_ mempl.PostCheckFunc,
|
||||
) error {
|
||||
return nil
|
||||
}
|
||||
func (Mempool) Flush() {}
|
||||
func (Mempool) FlushAppConn() error { return nil }
|
||||
func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
|
||||
func (Mempool) EnableTxsAvailable() {}
|
||||
@@ -31,13 +31,25 @@ const (
|
||||
maxActiveIDs = math.MaxUint16
|
||||
)
|
||||
|
||||
// MempoolWithWait extends the standard Mempool interface to allow reactor to
|
||||
// wait for transactions and iterate on them once there are any. Also it
|
||||
// includes ReapMaxTxs function, which is useful for testing.
|
||||
//
|
||||
// UNSTABLE
|
||||
type MempoolWithWait interface {
|
||||
Mempool
|
||||
TxsFront() *clist.CElement
|
||||
TxsWaitChan() <-chan struct{}
|
||||
ReapMaxTxs(n int) types.Txs
|
||||
}
|
||||
|
||||
// MempoolReactor handles mempool tx broadcasting amongst peers.
|
||||
// It maintains a map from peer ID to counter, to prevent gossiping txs to the
|
||||
// peers you received it from.
|
||||
type MempoolReactor struct {
|
||||
p2p.BaseReactor
|
||||
config *cfg.MempoolConfig
|
||||
Mempool *Mempool
|
||||
Mempool MempoolWithWait
|
||||
ids *mempoolIDs
|
||||
}
|
||||
|
||||
@@ -105,7 +117,7 @@ func newMempoolIDs() *mempoolIDs {
|
||||
}
|
||||
|
||||
// NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
|
||||
func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
|
||||
func NewMempoolReactor(config *cfg.MempoolConfig, mempool MempoolWithWait) *MempoolReactor {
|
||||
memR := &MempoolReactor{
|
||||
config: config,
|
||||
Mempool: mempool,
|
||||
@@ -115,10 +127,18 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac
|
||||
return memR
|
||||
}
|
||||
|
||||
type mempoolWithLogger interface {
|
||||
SetLogger(log.Logger)
|
||||
}
|
||||
|
||||
// SetLogger sets the Logger on the reactor and the underlying Mempool.
|
||||
func (memR *MempoolReactor) SetLogger(l log.Logger) {
|
||||
memR.Logger = l
|
||||
memR.Mempool.SetLogger(l)
|
||||
|
||||
// set mempoolWithLogger if mempool supports it
|
||||
if mem, ok := memR.Mempool.(mempoolWithLogger); ok {
|
||||
mem.SetLogger(l)
|
||||
}
|
||||
}
|
||||
|
||||
// OnStart implements p2p.BaseReactor.
|
||||
@@ -169,7 +189,7 @@ func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
peerID := memR.ids.GetForPeer(src)
|
||||
err := memR.Mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{PeerID: peerID})
|
||||
if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err)
|
||||
memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err)
|
||||
}
|
||||
// broadcasting happens from go routines per peer
|
||||
default:
|
||||
|
||||
20
node/node.go
20
node/node.go
@@ -157,9 +157,10 @@ type Node struct {
|
||||
// services
|
||||
eventBus *types.EventBus // pub/sub for services
|
||||
stateDB dbm.DB
|
||||
blockStore *bc.BlockStore // store the blockchain to disk
|
||||
bcReactor *bc.BlockchainReactor // for fast-syncing
|
||||
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
|
||||
blockStore *bc.BlockStore // store the blockchain to disk
|
||||
bcReactor *bc.BlockchainReactor // for fast-syncing
|
||||
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
|
||||
mempool *mempl.CListMempool
|
||||
consensusState *cs.ConsensusState // latest consensus state
|
||||
consensusReactor *cs.ConsensusReactor // for participating in the consensus
|
||||
evidencePool *evidence.EvidencePool // tracking evidence
|
||||
@@ -320,7 +321,7 @@ func NewNode(config *cfg.Config,
|
||||
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
|
||||
|
||||
// Make MempoolReactor
|
||||
mempool := mempl.NewMempool(
|
||||
mempool := mempl.NewCListMempool(
|
||||
config.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
@@ -329,7 +330,6 @@ func NewNode(config *cfg.Config,
|
||||
mempl.WithPostCheck(sm.TxPostCheck(state)),
|
||||
)
|
||||
mempoolLogger := logger.With("module", "mempool")
|
||||
mempool.SetLogger(mempoolLogger)
|
||||
if config.Mempool.WalEnabled() {
|
||||
mempool.InitWAL() // no need to have the mempool wal during tests
|
||||
}
|
||||
@@ -534,6 +534,7 @@ func NewNode(config *cfg.Config,
|
||||
blockStore: blockStore,
|
||||
bcReactor: bcReactor,
|
||||
mempoolReactor: mempoolReactor,
|
||||
mempool: mempool,
|
||||
consensusState: consensusState,
|
||||
consensusReactor: consensusReactor,
|
||||
evidencePool: evidencePool,
|
||||
@@ -617,7 +618,7 @@ func (n *Node) OnStop() {
|
||||
|
||||
// stop mempool WAL
|
||||
if n.config.Mempool.WalEnabled() {
|
||||
n.mempoolReactor.Mempool.CloseWAL()
|
||||
n.mempool.CloseWAL()
|
||||
}
|
||||
|
||||
if err := n.transport.Close(); err != nil {
|
||||
@@ -652,7 +653,7 @@ func (n *Node) ConfigureRPC() {
|
||||
rpccore.SetStateDB(n.stateDB)
|
||||
rpccore.SetBlockStore(n.blockStore)
|
||||
rpccore.SetConsensusState(n.consensusState)
|
||||
rpccore.SetMempool(n.mempoolReactor.Mempool)
|
||||
rpccore.SetMempool(n.mempool)
|
||||
rpccore.SetEvidencePool(n.evidencePool)
|
||||
rpccore.SetP2PPeers(n.sw)
|
||||
rpccore.SetP2PTransport(n)
|
||||
@@ -804,6 +805,11 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor {
|
||||
return n.mempoolReactor
|
||||
}
|
||||
|
||||
// Mempool returns the Node's Mempool.
|
||||
func (n *Node) Mempool() *mempl.CListMempool {
|
||||
return n.mempool
|
||||
}
|
||||
|
||||
// EvidencePool returns the Node's EvidencePool.
|
||||
func (n *Node) EvidencePool() *evidence.EvidencePool {
|
||||
return n.evidencePool
|
||||
|
||||
@@ -224,7 +224,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
|
||||
// Make Mempool
|
||||
memplMetrics := mempl.PrometheusMetrics("node_test")
|
||||
mempool := mempl.NewMempool(
|
||||
mempool := mempl.NewCListMempool(
|
||||
config.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
|
||||
@@ -249,7 +249,8 @@ func TestAppCalls(t *testing.T) {
|
||||
func TestBroadcastTxSync(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
mempool := node.MempoolReactor().Mempool
|
||||
// TODO (melekes): use mempool which is set on RPC rather than getting it from node
|
||||
mempool := node.Mempool()
|
||||
initMempoolSize := mempool.Size()
|
||||
|
||||
for i, c := range GetClients() {
|
||||
@@ -269,7 +270,7 @@ func TestBroadcastTxSync(t *testing.T) {
|
||||
func TestBroadcastTxCommit(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
mempool := node.MempoolReactor().Mempool
|
||||
mempool := node.Mempool()
|
||||
for i, c := range GetClients() {
|
||||
_, _, tx := MakeTxKV()
|
||||
bres, err := c.BroadcastTxCommit(tx)
|
||||
@@ -284,7 +285,7 @@ func TestBroadcastTxCommit(t *testing.T) {
|
||||
func TestUnconfirmedTxs(t *testing.T) {
|
||||
_, _, tx := MakeTxKV()
|
||||
|
||||
mempool := node.MempoolReactor().Mempool
|
||||
mempool := node.Mempool()
|
||||
_ = mempool.CheckTx(tx, nil)
|
||||
|
||||
for i, c := range GetClients() {
|
||||
@@ -305,7 +306,7 @@ func TestUnconfirmedTxs(t *testing.T) {
|
||||
func TestNumUnconfirmedTxs(t *testing.T) {
|
||||
_, _, tx := MakeTxKV()
|
||||
|
||||
mempool := node.MempoolReactor().Mempool
|
||||
mempool := node.Mempool()
|
||||
_ = mempool.CheckTx(tx, nil)
|
||||
mempoolSize := mempool.Size()
|
||||
|
||||
|
||||
@@ -49,6 +49,14 @@ type peers interface {
|
||||
Peers() p2p.IPeerSet
|
||||
}
|
||||
|
||||
// Mempool extends the standard Mempool interface to allow getting
|
||||
// total txs size. ReapMaxTxs is used by UnconfirmedTxs to reap N transactions.
|
||||
type Mempool interface {
|
||||
mempl.Mempool
|
||||
ReapMaxTxs(n int) types.Txs
|
||||
TxsBytes() int64
|
||||
}
|
||||
|
||||
//----------------------------------------------
|
||||
// These package level globals come with setters
|
||||
// that are expected to be called only once, on startup
|
||||
@@ -72,7 +80,7 @@ var (
|
||||
txIndexer txindex.TxIndexer
|
||||
consensusReactor *consensus.ConsensusReactor
|
||||
eventBus *types.EventBus // thread safe
|
||||
mempool *mempl.Mempool
|
||||
mempool Mempool
|
||||
|
||||
logger log.Logger
|
||||
|
||||
@@ -87,7 +95,7 @@ func SetBlockStore(bs sm.BlockStore) {
|
||||
blockStore = bs
|
||||
}
|
||||
|
||||
func SetMempool(mem *mempl.Mempool) {
|
||||
func SetMempool(mem Mempool) {
|
||||
mempool = mem
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/fail"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -30,7 +31,7 @@ type BlockExecutor struct {
|
||||
|
||||
// manage the mempool lock during commit
|
||||
// and update both with block results after commit.
|
||||
mempool Mempool
|
||||
mempool mempl.Mempool
|
||||
evpool EvidencePool
|
||||
|
||||
logger log.Logger
|
||||
@@ -48,7 +49,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
|
||||
|
||||
// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
|
||||
// Call SetEventBus to provide one.
|
||||
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
|
||||
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, mempool mempl.Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
|
||||
res := &BlockExecutor{
|
||||
db: db,
|
||||
proxyApp: proxyApp,
|
||||
|
||||
@@ -16,10 +16,10 @@ import (
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmtime "github.com/tendermint/tendermint/types/time"
|
||||
|
||||
memplmock "github.com/tendermint/tendermint/mempool/mock"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
tmtime "github.com/tendermint/tendermint/types/time"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -38,7 +38,7 @@ func TestApplyBlock(t *testing.T) {
|
||||
state, stateDB := state(1, 1)
|
||||
|
||||
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(),
|
||||
MockMempool{}, MockEvidencePool{})
|
||||
memplmock.Mempool{}, MockEvidencePool{})
|
||||
|
||||
block := makeBlock(state, 1)
|
||||
blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()}
|
||||
@@ -310,7 +310,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
|
||||
|
||||
state, stateDB := state(1, 1)
|
||||
|
||||
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), MockMempool{}, MockEvidencePool{})
|
||||
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), memplmock.Mempool{}, MockEvidencePool{})
|
||||
|
||||
eventBus := types.NewEventBus()
|
||||
err = eventBus.Start()
|
||||
@@ -367,7 +367,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
|
||||
defer proxyApp.Stop()
|
||||
|
||||
state, stateDB := state(1, 1)
|
||||
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), MockMempool{}, MockEvidencePool{})
|
||||
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), memplmock.Mempool{}, MockEvidencePool{})
|
||||
|
||||
block := makeBlock(state, 1)
|
||||
blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -11,57 +9,6 @@ import (
|
||||
// NOTE: Interfaces used by RPC must be thread safe!
|
||||
//------------------------------------------------------
|
||||
|
||||
//------------------------------------------------------
|
||||
// mempool
|
||||
|
||||
// Mempool defines the mempool interface as used by the ConsensusState.
|
||||
// Updates to the mempool need to be synchronized with committing a block
|
||||
// so apps can reset their transient state on Commit
|
||||
type Mempool interface {
|
||||
Lock()
|
||||
Unlock()
|
||||
|
||||
Size() int
|
||||
CheckTx(types.Tx, func(*abci.Response)) error
|
||||
CheckTxWithInfo(types.Tx, func(*abci.Response), mempool.TxInfo) error
|
||||
ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
|
||||
Update(int64, types.Txs, mempool.PreCheckFunc, mempool.PostCheckFunc) error
|
||||
Flush()
|
||||
FlushAppConn() error
|
||||
|
||||
TxsAvailable() <-chan struct{}
|
||||
EnableTxsAvailable()
|
||||
}
|
||||
|
||||
// MockMempool is an empty implementation of a Mempool, useful for testing.
|
||||
type MockMempool struct{}
|
||||
|
||||
var _ Mempool = MockMempool{}
|
||||
|
||||
func (MockMempool) Lock() {}
|
||||
func (MockMempool) Unlock() {}
|
||||
func (MockMempool) Size() int { return 0 }
|
||||
func (MockMempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error {
|
||||
return nil
|
||||
}
|
||||
func (MockMempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response),
|
||||
_ mempool.TxInfo) error {
|
||||
return nil
|
||||
}
|
||||
func (MockMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
|
||||
func (MockMempool) Update(
|
||||
_ int64,
|
||||
_ types.Txs,
|
||||
_ mempool.PreCheckFunc,
|
||||
_ mempool.PostCheckFunc,
|
||||
) error {
|
||||
return nil
|
||||
}
|
||||
func (MockMempool) Flush() {}
|
||||
func (MockMempool) FlushAppConn() error { return nil }
|
||||
func (MockMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
|
||||
func (MockMempool) EnableTxsAvailable() {}
|
||||
|
||||
//------------------------------------------------------
|
||||
// blockstore
|
||||
|
||||
@@ -96,7 +43,7 @@ type EvidencePool interface {
|
||||
IsCommitted(types.Evidence) bool
|
||||
}
|
||||
|
||||
// MockMempool is an empty implementation of a Mempool, useful for testing.
|
||||
// MockEvidencePool is an empty implementation of a Mempool, useful for testing.
|
||||
type MockEvidencePool struct{}
|
||||
|
||||
func (m MockEvidencePool) PendingEvidence(int64) []types.Evidence { return nil }
|
||||
|
||||
Reference in New Issue
Block a user