From cac0bfc62bdd2690a1d70c62696509b133f64b8d Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 30 Mar 2019 14:39:00 -0400 Subject: [PATCH] comments and rearrange fields --- mempool/mempool.go | 47 ++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index 73373a160..a0cb2a031 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -159,23 +159,27 @@ type Mempool struct { proxyMtx sync.Mutex proxyAppConn proxy.AppConnMempool txs *clist.CList // concurrent linked-list of good txs - // map for quick access to txs - // Used in CheckTx to record the tx sender. - txsMap map[[sha256.Size]byte]*clist.CElement - height int64 // the last block Update()'d to - rechecking int32 // for re-checking filtered txs on Update() - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here + preCheck PreCheckFunc + postCheck PostCheckFunc + + // track whether we're rechecking txs. + // these are not protected by a mutex and are expected to be mutated + // in serial (ie. by abci responses which are called in serial). + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here + + // notify listeners (ie. consensus) when txs are available notifiedTxsAvailable bool txsAvailable chan struct{} // fires once for each height, when the mempool is not empty - preCheck PreCheckFunc - postCheck PostCheckFunc + + // map for quick access to txs + // Used in CheckTx to record the tx sender. + txsMap map[[sha256.Size]byte]*clist.CElement // Atomic integers - - // Used to check if the mempool size is bigger than the allowed limit. - // See TxsBytes - txsBytes int64 + height int64 // the last block Update()'d to + rechecking int32 // for re-checking filtered txs on Update() + txsBytes int64 // total size of mempool, in bytes // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. @@ -380,12 +384,11 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo // CACHE if !mem.cache.Push(tx) { - // record the sender - e, ok := mem.txsMap[sha256.Sum256(tx)] - // The check is needed because tx may be in cache, but not in the mempool. - // E.g. after we've committed a block, txs are removed from the mempool, - // but not from the cache. - if ok { + // Record a new sender for a tx we've already seen. + // Note it's possible a tx is still in the cache but no longer in the mempool + // (eg. after committing a block, txs are removed from mempool but not cache), + // so we only record the sender for txs still in the mempool. + if e, ok := mem.txsMap[sha256.Sum256(tx)]; ok { memTx := e.Value.(*mempoolTx) if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded { // TODO: consider punishing peer for dups, @@ -454,6 +457,10 @@ func (mem *Mempool) globalCb(req *abci.Request, res *abci.Response) { func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) { return func(res *abci.Response) { if mem.recheckCursor != nil { + // this should not be possible. + // rechecks should only happen during Update + // after all checktxs were flushed and before + // any new ones happened. return } @@ -743,7 +750,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) { mem.recheckEnd = mem.txs.Back() // Push txs to proxyAppConn - // NOTE: reqResCb may be called concurrently. + // NOTE: globalCb may be called concurrently. for _, tx := range txs { mem.proxyAppConn.CheckTxAsync(tx) }