mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-06 05:25:35 +00:00
mempool: allow ReapX and CheckTx functions to run in parallel
allow ReapX and CheckTx functions to run in parallel, making it not possible to block certain proposers from creating a new block. Closes: #2972
This commit is contained in:
committed by
Tess Rinearson
parent
29035985c6
commit
62018d90df
@@ -18,6 +18,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- Go API
|
||||
|
||||
- [privval] [\#4744](https://github.com/tendermint/tendermint/pull/4744) Remove deprecated `OldFilePV` (@melekes)
|
||||
- [mempool] [\#4759](https://github.com/tendermint/tendermint/pull/4759) Modify `Mempool#InitWAL` to return an error (@melekes)
|
||||
|
||||
- Blockchain Protocol
|
||||
|
||||
@@ -32,6 +33,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- nostrip: don't strip debugging symbols nor DWARF tables.
|
||||
- cleveldb: use cleveldb as db backend instead of goleveldb.
|
||||
- race: pass -race to go build and enable data race detection.
|
||||
- [mempool] [\#4759](https://github.com/tendermint/tendermint/pull/4759) Allow ReapX and CheckTx functions to run in parallel (@melekes)
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ func EnsureDir(dir string, mode os.FileMode) error {
|
||||
if _, err := os.Stat(dir); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(dir, mode)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create directory %v. %v", dir, err)
|
||||
return fmt.Errorf("could not create directory %v: %w", dir, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -7,9 +7,6 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
@@ -32,9 +29,8 @@ import (
|
||||
// be efficiently accessed by multiple concurrent readers.
|
||||
type CListMempool struct {
|
||||
// Atomic integers
|
||||
height int64 // the last block Update()'d to
|
||||
txsBytes int64 // total size of mempool, in bytes
|
||||
rechecking int32 // for re-checking filtered txs on Update()
|
||||
height int64 // the last block Update()'d to
|
||||
txsBytes int64 // total size of mempool, in bytes
|
||||
|
||||
// notify listeners (ie. consensus) when txs are available
|
||||
notifiedTxsAvailable bool
|
||||
@@ -42,15 +38,19 @@ type CListMempool struct {
|
||||
|
||||
config *cfg.MempoolConfig
|
||||
|
||||
proxyMtx sync.Mutex
|
||||
// Exclusive mutex for Update method to prevent concurrent execution of
|
||||
// CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods.
|
||||
updateMtx sync.RWMutex
|
||||
preCheck PreCheckFunc
|
||||
postCheck PostCheckFunc
|
||||
|
||||
wal *auto.AutoFile // a log of mempool txs
|
||||
txs *clist.CList // concurrent linked-list of good txs
|
||||
proxyAppConn proxy.AppConnMempool
|
||||
txs *clist.CList // concurrent linked-list of good txs
|
||||
preCheck PreCheckFunc
|
||||
postCheck PostCheckFunc
|
||||
|
||||
// Track whether we're rechecking txs.
|
||||
// These are not protected by a mutex and are expected to be mutated
|
||||
// in serial (ie. by abci responses which are called in serial).
|
||||
// These are not protected by a mutex and are expected to be mutated in
|
||||
// serial (ie. by abci responses which are called in serial).
|
||||
recheckCursor *clist.CElement // next expected response
|
||||
recheckEnd *clist.CElement // re-checking stops here
|
||||
|
||||
@@ -62,9 +62,6 @@ type CListMempool struct {
|
||||
// This reduces the pressure on the proxyApp.
|
||||
cache txCache
|
||||
|
||||
// A log of mempool txs
|
||||
wal *auto.AutoFile
|
||||
|
||||
logger log.Logger
|
||||
|
||||
metrics *Metrics
|
||||
@@ -87,7 +84,6 @@ func NewCListMempool(
|
||||
proxyAppConn: proxyAppConn,
|
||||
txs: clist.New(),
|
||||
height: height,
|
||||
rechecking: 0,
|
||||
recheckCursor: nil,
|
||||
recheckEnd: nil,
|
||||
logger: log.NewNopLogger(),
|
||||
@@ -132,55 +128,64 @@ func WithMetrics(metrics *Metrics) CListMempoolOption {
|
||||
return func(mem *CListMempool) { mem.metrics = metrics }
|
||||
}
|
||||
|
||||
// *panics* if can't create directory or open file.
|
||||
// *not thread safe*
|
||||
func (mem *CListMempool) InitWAL() {
|
||||
walDir := mem.config.WalDir()
|
||||
err := tmos.EnsureDir(walDir, 0700)
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "Error ensuring WAL dir"))
|
||||
func (mem *CListMempool) InitWAL() error {
|
||||
var (
|
||||
walDir = mem.config.WalDir()
|
||||
walFile = walDir + "/wal"
|
||||
)
|
||||
|
||||
const perm = 0700
|
||||
if err := tmos.EnsureDir(walDir, perm); err != nil {
|
||||
return err
|
||||
}
|
||||
af, err := auto.OpenAutoFile(walDir + "/wal")
|
||||
|
||||
af, err := auto.OpenAutoFile(walFile)
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "Error opening WAL file"))
|
||||
return fmt.Errorf("can't open autofile %s: %w", walFile, err)
|
||||
}
|
||||
|
||||
mem.wal = af
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mem *CListMempool) CloseWAL() {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
if err := mem.wal.Close(); err != nil {
|
||||
mem.logger.Error("Error closing WAL", "err", err)
|
||||
}
|
||||
mem.wal = nil
|
||||
}
|
||||
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) Lock() {
|
||||
mem.proxyMtx.Lock()
|
||||
mem.updateMtx.Lock()
|
||||
}
|
||||
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) Unlock() {
|
||||
mem.proxyMtx.Unlock()
|
||||
mem.updateMtx.Unlock()
|
||||
}
|
||||
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) Size() int {
|
||||
return mem.txs.Len()
|
||||
}
|
||||
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) TxsBytes() int64 {
|
||||
return atomic.LoadInt64(&mem.txsBytes)
|
||||
}
|
||||
|
||||
// Lock() must be help by the caller during execution.
|
||||
func (mem *CListMempool) FlushAppConn() error {
|
||||
return mem.proxyAppConn.FlushSync()
|
||||
}
|
||||
|
||||
// XXX: Unsafe! Calling Flush may leave mempool in inconsistent state.
|
||||
func (mem *CListMempool) Flush() {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
mem.updateMtx.RLock()
|
||||
defer mem.updateMtx.RUnlock()
|
||||
|
||||
_ = atomic.SwapInt64(&mem.txsBytes, 0)
|
||||
mem.cache.Reset()
|
||||
|
||||
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
||||
@@ -188,13 +193,17 @@ func (mem *CListMempool) Flush() {
|
||||
e.DetachPrev()
|
||||
}
|
||||
|
||||
mem.txsMap = sync.Map{}
|
||||
_ = atomic.SwapInt64(&mem.txsBytes, 0)
|
||||
mem.txsMap.Range(func(key, _ interface{}) bool {
|
||||
mem.txsMap.Delete(key)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// TxsFront returns the first transaction in the ordered list for peer
|
||||
// goroutines to call .NextWait() on.
|
||||
// FIXME: leaking implementation details!
|
||||
//
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) TxsFront() *clist.CElement {
|
||||
return mem.txs.Front()
|
||||
}
|
||||
@@ -202,6 +211,8 @@ func (mem *CListMempool) TxsFront() *clist.CElement {
|
||||
// TxsWaitChan returns a channel to wait on transactions. It will be closed
|
||||
// once the mempool is not empty (ie. the internal `mem.txs` has at least one
|
||||
// element)
|
||||
//
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
|
||||
return mem.txs.WaitChan()
|
||||
}
|
||||
@@ -210,21 +221,17 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
|
||||
// cb: A callback from the CheckTx command.
|
||||
// It gets called from another goroutine.
|
||||
// CONTRACT: Either cb will get called, or err returned.
|
||||
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
|
||||
mem.proxyMtx.Lock()
|
||||
//
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error {
|
||||
mem.updateMtx.RLock()
|
||||
// use defer to unlock mutex because application (*local client*) might panic
|
||||
defer mem.proxyMtx.Unlock()
|
||||
defer mem.updateMtx.RUnlock()
|
||||
|
||||
var (
|
||||
memSize = mem.Size()
|
||||
txsBytes = mem.TxsBytes()
|
||||
txSize = len(tx)
|
||||
)
|
||||
if memSize >= mem.config.Size ||
|
||||
int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
|
||||
return ErrMempoolIsFull{
|
||||
memSize, mem.config.Size,
|
||||
txsBytes, mem.config.MaxTxsBytes}
|
||||
txSize := len(tx)
|
||||
|
||||
if err := mem.isFull(txSize); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The size of the corresponding amino-encoded TxMessage
|
||||
@@ -274,7 +281,7 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
|
||||
// END WAL
|
||||
|
||||
// NOTE: proxyAppConn may error if tx buffer is full
|
||||
if err = mem.proxyAppConn.Error(); err != nil {
|
||||
if err := mem.proxyAppConn.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -290,7 +297,9 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
|
||||
// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
|
||||
// include this information. If we're not in the midst of a recheck, this function will just return,
|
||||
// so the request specific callback can do the work.
|
||||
// When rechecking, we don't need the peerID, so the recheck callback happens here.
|
||||
//
|
||||
// When rechecking, we don't need the peerID, so the recheck callback happens
|
||||
// here.
|
||||
func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
|
||||
if mem.recheckCursor == nil {
|
||||
return
|
||||
@@ -359,6 +368,22 @@ func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromC
|
||||
}
|
||||
}
|
||||
|
||||
func (mem *CListMempool) isFull(txSize int) error {
|
||||
var (
|
||||
memSize = mem.Size()
|
||||
txsBytes = mem.TxsBytes()
|
||||
)
|
||||
|
||||
if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
|
||||
return ErrMempoolIsFull{
|
||||
memSize, mem.config.Size,
|
||||
txsBytes, mem.config.MaxTxsBytes,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// callback, which is called after the app checked the tx for the first time.
|
||||
//
|
||||
// The case where the app checks the tx for the second and subsequent times is
|
||||
@@ -376,6 +401,15 @@ func (mem *CListMempool) resCbFirstTime(
|
||||
postCheckErr = mem.postCheck(tx, r.CheckTx)
|
||||
}
|
||||
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
|
||||
// Check mempool isn't full again to reduce the chance of exceeding the
|
||||
// limits.
|
||||
if err := mem.isFull(len(tx)); err != nil {
|
||||
// remove from cache (mempool might have a space later)
|
||||
mem.cache.Remove(tx)
|
||||
mem.logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
memTx := &mempoolTx{
|
||||
height: mem.height,
|
||||
gasWanted: r.CheckTx.GasWanted,
|
||||
@@ -437,7 +471,6 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
||||
}
|
||||
if mem.recheckCursor == nil {
|
||||
// Done!
|
||||
atomic.StoreInt32(&mem.rechecking, 0)
|
||||
mem.logger.Info("Done rechecking txs")
|
||||
|
||||
// incase the recheck removed all txs
|
||||
@@ -450,6 +483,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
||||
}
|
||||
}
|
||||
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) TxsAvailable() <-chan struct{} {
|
||||
return mem.txsAvailable
|
||||
}
|
||||
@@ -468,17 +502,15 @@ func (mem *CListMempool) notifyTxsAvailable() {
|
||||
}
|
||||
}
|
||||
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
mem.updateMtx.RLock()
|
||||
defer mem.updateMtx.RUnlock()
|
||||
|
||||
for atomic.LoadInt32(&mem.rechecking) > 0 {
|
||||
// TODO: Something better?
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
var totalBytes int64
|
||||
var totalGas int64
|
||||
var (
|
||||
totalBytes int64
|
||||
totalGas int64
|
||||
)
|
||||
// TODO: we will get a performance boost if we have a good estimate of avg
|
||||
// size per tx, and set the initial capacity based off of that.
|
||||
// txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max/mem.avgTxSize))
|
||||
@@ -505,19 +537,15 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
|
||||
return txs
|
||||
}
|
||||
|
||||
// Safe for concurrent use by multiple goroutines.
|
||||
func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
mem.updateMtx.RLock()
|
||||
defer mem.updateMtx.RUnlock()
|
||||
|
||||
if max < 0 {
|
||||
max = mem.txs.Len()
|
||||
}
|
||||
|
||||
for atomic.LoadInt32(&mem.rechecking) > 0 {
|
||||
// TODO: Something better?
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max))
|
||||
for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
|
||||
memTx := e.Value.(*mempoolTx)
|
||||
@@ -526,6 +554,7 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
|
||||
return txs
|
||||
}
|
||||
|
||||
// Lock() must be help by the caller during execution.
|
||||
func (mem *CListMempool) Update(
|
||||
height int64,
|
||||
txs types.Txs,
|
||||
@@ -593,7 +622,6 @@ func (mem *CListMempool) recheckTxs() {
|
||||
panic("recheckTxs is called, but the mempool is empty")
|
||||
}
|
||||
|
||||
atomic.StoreInt32(&mem.rechecking, 1)
|
||||
mem.recheckCursor = mem.txs.Front()
|
||||
mem.recheckEnd = mem.txs.Back()
|
||||
|
||||
|
||||
@@ -6,19 +6,18 @@
|
||||
// safely by calling .NextWait() on each element.
|
||||
|
||||
// So we have several go-routines:
|
||||
// 1. Consensus calling Update() and Reap() synchronously
|
||||
// 1. Consensus calling Update() and ReapMaxBytesMaxGas() synchronously
|
||||
// 2. Many mempool reactor's peer routines calling CheckTx()
|
||||
// 3. Many mempool reactor's peer routines traversing the txs linked list
|
||||
// 4. Another goroutine calling GarbageCollectTxs() periodically
|
||||
|
||||
// To manage these goroutines, there are three methods of locking.
|
||||
// 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe)
|
||||
// 2. Mutations to the linked-list elements are atomic
|
||||
// 3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx
|
||||
// 3. CheckTx() and/or ReapMaxBytesMaxGas() calls can be paused upon Update(), protected by .updateMtx
|
||||
|
||||
// Garbage collection of old elements from mempool.txs is handlde via
|
||||
// the DetachPrev() call, which makes old elements not reachable by
|
||||
// peer broadcastTxRoutine() automatically garbage collected.
|
||||
// Garbage collection of old elements from mempool.txs is handlde via the
|
||||
// DetachPrev() call, which makes old elements not reachable by peer
|
||||
// broadcastTxRoutine().
|
||||
|
||||
// TODO: Better handle abci client errors. (make it automatically handle connection errors)
|
||||
package mempool
|
||||
|
||||
@@ -37,7 +37,7 @@ type Mempool interface {
|
||||
|
||||
// Update informs the mempool that the given txs were committed and can be discarded.
|
||||
// NOTE: this should be called *after* block is committed by consensus.
|
||||
// NOTE: unsafe; Lock/Unlock must be managed by caller
|
||||
// NOTE: Lock/Unlock must be managed by caller
|
||||
Update(
|
||||
blockHeight int64,
|
||||
blockTxs types.Txs,
|
||||
@@ -48,6 +48,7 @@ type Mempool interface {
|
||||
|
||||
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
|
||||
// done. E.g. from CheckTx.
|
||||
// NOTE: Lock/Unlock must be managed by caller
|
||||
FlushAppConn() error
|
||||
|
||||
// Flush removes all transactions from the mempool and cache
|
||||
@@ -68,8 +69,9 @@ type Mempool interface {
|
||||
// TxsBytes returns the total size of all txs in the mempool.
|
||||
TxsBytes() int64
|
||||
|
||||
// InitWAL creates a directory for the WAL file and opens a file itself.
|
||||
InitWAL()
|
||||
// InitWAL creates a directory for the WAL file and opens a file itself. If
|
||||
// there is an error, it will be of type *PathError.
|
||||
InitWAL() error
|
||||
|
||||
// CloseWAL closes and discards the underlying WAL file.
|
||||
// Any further writes will not be relayed to disk.
|
||||
|
||||
@@ -38,5 +38,5 @@ func (Mempool) TxsBytes() int64 { return 0 }
|
||||
func (Mempool) TxsFront() *clist.CElement { return nil }
|
||||
func (Mempool) TxsWaitChan() <-chan struct{} { return nil }
|
||||
|
||||
func (Mempool) InitWAL() {}
|
||||
func (Mempool) CloseWAL() {}
|
||||
func (Mempool) InitWAL() error { return nil }
|
||||
func (Mempool) CloseWAL() {}
|
||||
|
||||
@@ -773,7 +773,10 @@ func (n *Node) OnStart() error {
|
||||
n.isListening = true
|
||||
|
||||
if n.config.Mempool.WalEnabled() {
|
||||
n.mempool.InitWAL() // no need to have the mempool wal during tests
|
||||
err = n.mempool.InitWAL()
|
||||
if err != nil {
|
||||
return fmt.Errorf("init mempool WAL: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start the switch (the P2P server).
|
||||
|
||||
Reference in New Issue
Block a user