comments and rearrange fields

This commit is contained in:
Ethan Buchman
2019-03-30 14:39:00 -04:00
parent bcb0b348e7
commit cac0bfc62b

View File

@@ -159,23 +159,27 @@ type Mempool struct {
proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
// map for quick access to txs
// Used in CheckTx to record the tx sender.
txsMap map[[sha256.Size]byte]*clist.CElement
height int64 // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
preCheck PreCheckFunc
postCheck PostCheckFunc
// track whether we're rechecking txs.
// these are not protected by a mutex and are expected to be mutated
// in serial (ie. by abci responses which are called in serial).
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
preCheck PreCheckFunc
postCheck PostCheckFunc
// map for quick access to txs
// Used in CheckTx to record the tx sender.
txsMap map[[sha256.Size]byte]*clist.CElement
// Atomic integers
// Used to check if the mempool size is bigger than the allowed limit.
// See TxsBytes
txsBytes int64
height int64 // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
txsBytes int64 // total size of mempool, in bytes
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
@@ -380,12 +384,11 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo
// CACHE
if !mem.cache.Push(tx) {
// record the sender
e, ok := mem.txsMap[sha256.Sum256(tx)]
// The check is needed because tx may be in cache, but not in the mempool.
// E.g. after we've committed a block, txs are removed from the mempool,
// but not from the cache.
if ok {
// Record a new sender for a tx we've already seen.
// Note it's possible a tx is still in the cache but no longer in the mempool
// (eg. after committing a block, txs are removed from mempool but not cache),
// so we only record the sender for txs still in the mempool.
if e, ok := mem.txsMap[sha256.Sum256(tx)]; ok {
memTx := e.Value.(*mempoolTx)
if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
// TODO: consider punishing peer for dups,
@@ -454,6 +457,10 @@ func (mem *Mempool) globalCb(req *abci.Request, res *abci.Response) {
func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) {
return func(res *abci.Response) {
if mem.recheckCursor != nil {
// this should not be possible.
// rechecks should only happen during Update
// after all checktxs were flushed and before
// any new ones happened.
return
}
@@ -743,7 +750,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) {
mem.recheckEnd = mem.txs.Back()
// Push txs to proxyAppConn
// NOTE: reqResCb may be called concurrently.
// NOTE: globalCb may be called concurrently.
for _, tx := range txs {
mem.proxyAppConn.CheckTxAsync(tx)
}