|
|
|
|
@@ -7,9 +7,6 @@ import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
|
|
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
|
|
|
cfg "github.com/tendermint/tendermint/config"
|
|
|
|
|
@@ -32,9 +29,8 @@ import (
|
|
|
|
|
// be efficiently accessed by multiple concurrent readers.
|
|
|
|
|
type CListMempool struct {
|
|
|
|
|
// Atomic integers
|
|
|
|
|
height int64 // the last block Update()'d to
|
|
|
|
|
txsBytes int64 // total size of mempool, in bytes
|
|
|
|
|
rechecking int32 // for re-checking filtered txs on Update()
|
|
|
|
|
height int64 // the last block Update()'d to
|
|
|
|
|
txsBytes int64 // total size of mempool, in bytes
|
|
|
|
|
|
|
|
|
|
// notify listeners (ie. consensus) when txs are available
|
|
|
|
|
notifiedTxsAvailable bool
|
|
|
|
|
@@ -42,15 +38,19 @@ type CListMempool struct {
|
|
|
|
|
|
|
|
|
|
config *cfg.MempoolConfig
|
|
|
|
|
|
|
|
|
|
proxyMtx sync.Mutex
|
|
|
|
|
// Exclusive mutex for Update method to prevent concurrent execution of
|
|
|
|
|
// CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods.
|
|
|
|
|
updateMtx sync.RWMutex
|
|
|
|
|
preCheck PreCheckFunc
|
|
|
|
|
postCheck PostCheckFunc
|
|
|
|
|
|
|
|
|
|
wal *auto.AutoFile // a log of mempool txs
|
|
|
|
|
txs *clist.CList // concurrent linked-list of good txs
|
|
|
|
|
proxyAppConn proxy.AppConnMempool
|
|
|
|
|
txs *clist.CList // concurrent linked-list of good txs
|
|
|
|
|
preCheck PreCheckFunc
|
|
|
|
|
postCheck PostCheckFunc
|
|
|
|
|
|
|
|
|
|
// Track whether we're rechecking txs.
|
|
|
|
|
// These are not protected by a mutex and are expected to be mutated
|
|
|
|
|
// in serial (ie. by abci responses which are called in serial).
|
|
|
|
|
// These are not protected by a mutex and are expected to be mutated in
|
|
|
|
|
// serial (ie. by abci responses which are called in serial).
|
|
|
|
|
recheckCursor *clist.CElement // next expected response
|
|
|
|
|
recheckEnd *clist.CElement // re-checking stops here
|
|
|
|
|
|
|
|
|
|
@@ -62,9 +62,6 @@ type CListMempool struct {
|
|
|
|
|
// This reduces the pressure on the proxyApp.
|
|
|
|
|
cache txCache
|
|
|
|
|
|
|
|
|
|
// A log of mempool txs
|
|
|
|
|
wal *auto.AutoFile
|
|
|
|
|
|
|
|
|
|
logger log.Logger
|
|
|
|
|
|
|
|
|
|
metrics *Metrics
|
|
|
|
|
@@ -87,7 +84,6 @@ func NewCListMempool(
|
|
|
|
|
proxyAppConn: proxyAppConn,
|
|
|
|
|
txs: clist.New(),
|
|
|
|
|
height: height,
|
|
|
|
|
rechecking: 0,
|
|
|
|
|
recheckCursor: nil,
|
|
|
|
|
recheckEnd: nil,
|
|
|
|
|
logger: log.NewNopLogger(),
|
|
|
|
|
@@ -132,55 +128,64 @@ func WithMetrics(metrics *Metrics) CListMempoolOption {
|
|
|
|
|
return func(mem *CListMempool) { mem.metrics = metrics }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// *panics* if can't create directory or open file.
|
|
|
|
|
// *not thread safe*
|
|
|
|
|
func (mem *CListMempool) InitWAL() {
|
|
|
|
|
walDir := mem.config.WalDir()
|
|
|
|
|
err := tmos.EnsureDir(walDir, 0700)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(errors.Wrap(err, "Error ensuring WAL dir"))
|
|
|
|
|
func (mem *CListMempool) InitWAL() error {
|
|
|
|
|
var (
|
|
|
|
|
walDir = mem.config.WalDir()
|
|
|
|
|
walFile = walDir + "/wal"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const perm = 0700
|
|
|
|
|
if err := tmos.EnsureDir(walDir, perm); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
af, err := auto.OpenAutoFile(walDir + "/wal")
|
|
|
|
|
|
|
|
|
|
af, err := auto.OpenAutoFile(walFile)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(errors.Wrap(err, "Error opening WAL file"))
|
|
|
|
|
return fmt.Errorf("can't open autofile %s: %w", walFile, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mem.wal = af
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (mem *CListMempool) CloseWAL() {
|
|
|
|
|
mem.proxyMtx.Lock()
|
|
|
|
|
defer mem.proxyMtx.Unlock()
|
|
|
|
|
|
|
|
|
|
if err := mem.wal.Close(); err != nil {
|
|
|
|
|
mem.logger.Error("Error closing WAL", "err", err)
|
|
|
|
|
}
|
|
|
|
|
mem.wal = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) Lock() {
|
|
|
|
|
mem.proxyMtx.Lock()
|
|
|
|
|
mem.updateMtx.Lock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) Unlock() {
|
|
|
|
|
mem.proxyMtx.Unlock()
|
|
|
|
|
mem.updateMtx.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) Size() int {
|
|
|
|
|
return mem.txs.Len()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) TxsBytes() int64 {
|
|
|
|
|
return atomic.LoadInt64(&mem.txsBytes)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Lock() must be help by the caller during execution.
|
|
|
|
|
func (mem *CListMempool) FlushAppConn() error {
|
|
|
|
|
return mem.proxyAppConn.FlushSync()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// XXX: Unsafe! Calling Flush may leave mempool in inconsistent state.
|
|
|
|
|
func (mem *CListMempool) Flush() {
|
|
|
|
|
mem.proxyMtx.Lock()
|
|
|
|
|
defer mem.proxyMtx.Unlock()
|
|
|
|
|
mem.updateMtx.RLock()
|
|
|
|
|
defer mem.updateMtx.RUnlock()
|
|
|
|
|
|
|
|
|
|
_ = atomic.SwapInt64(&mem.txsBytes, 0)
|
|
|
|
|
mem.cache.Reset()
|
|
|
|
|
|
|
|
|
|
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
|
|
|
|
@@ -188,13 +193,17 @@ func (mem *CListMempool) Flush() {
|
|
|
|
|
e.DetachPrev()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mem.txsMap = sync.Map{}
|
|
|
|
|
_ = atomic.SwapInt64(&mem.txsBytes, 0)
|
|
|
|
|
mem.txsMap.Range(func(key, _ interface{}) bool {
|
|
|
|
|
mem.txsMap.Delete(key)
|
|
|
|
|
return true
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TxsFront returns the first transaction in the ordered list for peer
|
|
|
|
|
// goroutines to call .NextWait() on.
|
|
|
|
|
// FIXME: leaking implementation details!
|
|
|
|
|
//
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) TxsFront() *clist.CElement {
|
|
|
|
|
return mem.txs.Front()
|
|
|
|
|
}
|
|
|
|
|
@@ -202,6 +211,8 @@ func (mem *CListMempool) TxsFront() *clist.CElement {
|
|
|
|
|
// TxsWaitChan returns a channel to wait on transactions. It will be closed
|
|
|
|
|
// once the mempool is not empty (ie. the internal `mem.txs` has at least one
|
|
|
|
|
// element)
|
|
|
|
|
//
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
|
|
|
|
|
return mem.txs.WaitChan()
|
|
|
|
|
}
|
|
|
|
|
@@ -210,21 +221,17 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
|
|
|
|
|
// cb: A callback from the CheckTx command.
|
|
|
|
|
// It gets called from another goroutine.
|
|
|
|
|
// CONTRACT: Either cb will get called, or err returned.
|
|
|
|
|
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
|
|
|
|
|
mem.proxyMtx.Lock()
|
|
|
|
|
//
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error {
|
|
|
|
|
mem.updateMtx.RLock()
|
|
|
|
|
// use defer to unlock mutex because application (*local client*) might panic
|
|
|
|
|
defer mem.proxyMtx.Unlock()
|
|
|
|
|
defer mem.updateMtx.RUnlock()
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
memSize = mem.Size()
|
|
|
|
|
txsBytes = mem.TxsBytes()
|
|
|
|
|
txSize = len(tx)
|
|
|
|
|
)
|
|
|
|
|
if memSize >= mem.config.Size ||
|
|
|
|
|
int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
|
|
|
|
|
return ErrMempoolIsFull{
|
|
|
|
|
memSize, mem.config.Size,
|
|
|
|
|
txsBytes, mem.config.MaxTxsBytes}
|
|
|
|
|
txSize := len(tx)
|
|
|
|
|
|
|
|
|
|
if err := mem.isFull(txSize); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The size of the corresponding amino-encoded TxMessage
|
|
|
|
|
@@ -274,7 +281,7 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
|
|
|
|
|
// END WAL
|
|
|
|
|
|
|
|
|
|
// NOTE: proxyAppConn may error if tx buffer is full
|
|
|
|
|
if err = mem.proxyAppConn.Error(); err != nil {
|
|
|
|
|
if err := mem.proxyAppConn.Error(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -290,7 +297,9 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
|
|
|
|
|
// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
|
|
|
|
|
// include this information. If we're not in the midst of a recheck, this function will just return,
|
|
|
|
|
// so the request specific callback can do the work.
|
|
|
|
|
// When rechecking, we don't need the peerID, so the recheck callback happens here.
|
|
|
|
|
//
|
|
|
|
|
// When rechecking, we don't need the peerID, so the recheck callback happens
|
|
|
|
|
// here.
|
|
|
|
|
func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
|
|
|
|
|
if mem.recheckCursor == nil {
|
|
|
|
|
return
|
|
|
|
|
@@ -359,6 +368,22 @@ func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromC
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (mem *CListMempool) isFull(txSize int) error {
|
|
|
|
|
var (
|
|
|
|
|
memSize = mem.Size()
|
|
|
|
|
txsBytes = mem.TxsBytes()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
|
|
|
|
|
return ErrMempoolIsFull{
|
|
|
|
|
memSize, mem.config.Size,
|
|
|
|
|
txsBytes, mem.config.MaxTxsBytes,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// callback, which is called after the app checked the tx for the first time.
|
|
|
|
|
//
|
|
|
|
|
// The case where the app checks the tx for the second and subsequent times is
|
|
|
|
|
@@ -376,6 +401,15 @@ func (mem *CListMempool) resCbFirstTime(
|
|
|
|
|
postCheckErr = mem.postCheck(tx, r.CheckTx)
|
|
|
|
|
}
|
|
|
|
|
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
|
|
|
|
|
// Check mempool isn't full again to reduce the chance of exceeding the
|
|
|
|
|
// limits.
|
|
|
|
|
if err := mem.isFull(len(tx)); err != nil {
|
|
|
|
|
// remove from cache (mempool might have a space later)
|
|
|
|
|
mem.cache.Remove(tx)
|
|
|
|
|
mem.logger.Error(err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
memTx := &mempoolTx{
|
|
|
|
|
height: mem.height,
|
|
|
|
|
gasWanted: r.CheckTx.GasWanted,
|
|
|
|
|
@@ -437,7 +471,6 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
|
|
|
|
}
|
|
|
|
|
if mem.recheckCursor == nil {
|
|
|
|
|
// Done!
|
|
|
|
|
atomic.StoreInt32(&mem.rechecking, 0)
|
|
|
|
|
mem.logger.Info("Done rechecking txs")
|
|
|
|
|
|
|
|
|
|
// incase the recheck removed all txs
|
|
|
|
|
@@ -450,6 +483,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) TxsAvailable() <-chan struct{} {
|
|
|
|
|
return mem.txsAvailable
|
|
|
|
|
}
|
|
|
|
|
@@ -468,17 +502,15 @@ func (mem *CListMempool) notifyTxsAvailable() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
|
|
|
|
|
mem.proxyMtx.Lock()
|
|
|
|
|
defer mem.proxyMtx.Unlock()
|
|
|
|
|
mem.updateMtx.RLock()
|
|
|
|
|
defer mem.updateMtx.RUnlock()
|
|
|
|
|
|
|
|
|
|
for atomic.LoadInt32(&mem.rechecking) > 0 {
|
|
|
|
|
// TODO: Something better?
|
|
|
|
|
time.Sleep(time.Millisecond * 10)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var totalBytes int64
|
|
|
|
|
var totalGas int64
|
|
|
|
|
var (
|
|
|
|
|
totalBytes int64
|
|
|
|
|
totalGas int64
|
|
|
|
|
)
|
|
|
|
|
// TODO: we will get a performance boost if we have a good estimate of avg
|
|
|
|
|
// size per tx, and set the initial capacity based off of that.
|
|
|
|
|
// txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max/mem.avgTxSize))
|
|
|
|
|
@@ -505,19 +537,15 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
|
|
|
|
|
return txs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Safe for concurrent use by multiple goroutines.
|
|
|
|
|
func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
|
|
|
|
|
mem.proxyMtx.Lock()
|
|
|
|
|
defer mem.proxyMtx.Unlock()
|
|
|
|
|
mem.updateMtx.RLock()
|
|
|
|
|
defer mem.updateMtx.RUnlock()
|
|
|
|
|
|
|
|
|
|
if max < 0 {
|
|
|
|
|
max = mem.txs.Len()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for atomic.LoadInt32(&mem.rechecking) > 0 {
|
|
|
|
|
// TODO: Something better?
|
|
|
|
|
time.Sleep(time.Millisecond * 10)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max))
|
|
|
|
|
for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
|
|
|
|
|
memTx := e.Value.(*mempoolTx)
|
|
|
|
|
@@ -526,6 +554,7 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
|
|
|
|
|
return txs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Lock() must be help by the caller during execution.
|
|
|
|
|
func (mem *CListMempool) Update(
|
|
|
|
|
height int64,
|
|
|
|
|
txs types.Txs,
|
|
|
|
|
@@ -593,7 +622,6 @@ func (mem *CListMempool) recheckTxs() {
|
|
|
|
|
panic("recheckTxs is called, but the mempool is empty")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
atomic.StoreInt32(&mem.rechecking, 1)
|
|
|
|
|
mem.recheckCursor = mem.txs.Front()
|
|
|
|
|
mem.recheckEnd = mem.txs.Back()
|
|
|
|
|
|
|
|
|
|
|