diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 8fd8b335c..bbff3b98f 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -103,8 +103,6 @@ func NewCListMempool( return mempool } -// EnableTxsAvailable initializes the TxsAvailable channel, -// ensuring it will trigger once every height when transactions are available. // NOTE: not thread safe - should only be called once, on startup func (mem *CListMempool) EnableTxsAvailable() { mem.txsAvailable = make(chan struct{}, 1) @@ -161,33 +159,26 @@ func (mem *CListMempool) CloseWAL() { mem.wal = nil } -// Lock locks the mempool. The consensus must be able to hold lock to safely update. func (mem *CListMempool) Lock() { mem.proxyMtx.Lock() } -// Unlock unlocks the mempool. func (mem *CListMempool) Unlock() { mem.proxyMtx.Unlock() } -// Size returns the number of transactions in the mempool. func (mem *CListMempool) Size() int { return mem.txs.Len() } -// TxsBytes returns the total size of all txs in the mempool. func (mem *CListMempool) TxsBytes() int64 { return atomic.LoadInt64(&mem.txsBytes) } -// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are -// done. E.g. from CheckTx. func (mem *CListMempool) FlushAppConn() error { return mem.proxyAppConn.FlushSync() } -// Flush removes all transactions from the mempool and cache func (mem *CListMempool) Flush() { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -216,19 +207,14 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { return mem.txs.WaitChan() } -// CheckTx executes a new transaction against the application to determine its validity -// and whether it should be added to the mempool. // It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { - return mem.CheckTxWithInfo(tx, cb, TxInfo{PeerID: UnknownPeerID}) + return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID}) } -// CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx. -// Currently this metadata is the peer who sent it, -// used to prevent the tx from being gossiped back to them. func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) { mem.proxyMtx.Lock() // use defer to unlock mutex because application (*local client*) might panic @@ -266,7 +252,7 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t // so we only record the sender for txs still in the mempool. if e, ok := mem.txsMap.Load(txKey(tx)); ok { memTx := e.(*clist.CElement).Value.(*mempoolTx) - if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded { + if _, loaded := memTx.senders.LoadOrStore(txInfo.SenderID, true); loaded { // TODO: consider punishing peer for dups, // its non-trivial since invalid txs can become valid, // but they can spam the same tx with little cost to them atm. @@ -297,7 +283,7 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t } reqRes := mem.proxyAppConn.CheckTxAsync(tx) - reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID, cb)) + reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, cb)) return nil } @@ -457,9 +443,6 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { } } -// TxsAvailable returns a channel which fires once for every height, -// and only when transactions are available in the mempool. -// NOTE: the returned channel may be nil if EnableTxsAvailable was not called. func (mem *CListMempool) TxsAvailable() <-chan struct{} { return mem.txsAvailable } @@ -478,10 +461,6 @@ func (mem *CListMempool) notifyTxsAvailable() { } } -// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes bytes total -// with the condition that the total gasWanted must be less than maxGas. -// If both maxes are negative, there is no cap on the size of all returned -// transactions (~ all available transactions). func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -519,9 +498,6 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { return txs } -// ReapMaxTxs reaps up to max transactions from the mempool. -// If max is negative, there is no cap on the size of all returned -// transactions (~ all available transactions). func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -543,9 +519,6 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { return txs } -// Update informs the mempool that the given txs were committed and can be discarded. -// NOTE: this should be called *after* block is committed by consensus. -// NOTE: unsafe; Lock/Unlock must be managed by caller func (mem *CListMempool) Update( height int64, txs types.Txs, diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index e3f98230d..69d4eb68c 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -68,7 +68,7 @@ func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) { func checkTxs(t *testing.T, mempool Mempool, count int, peerID uint16) types.Txs { txs := make(types.Txs, count) - txInfo := TxInfo{PeerID: peerID} + txInfo := TxInfo{SenderID: peerID} for i := 0; i < count; i++ { txBytes := make([]byte, 20) txs[i] = txBytes @@ -541,7 +541,7 @@ func TestMempoolRemoteAppConcurrency(t *testing.T) { tx := txs[int(txNum)] // this will err with ErrTxInCache many times ... - mempool.CheckTxWithInfo(tx, nil, TxInfo{PeerID: uint16(peerID)}) + mempool.CheckTxWithInfo(tx, nil, TxInfo{SenderID: uint16(peerID)}) } err := mempool.FlushAppConn() require.NoError(t, err) diff --git a/mempool/mempool.go b/mempool/mempool.go index 8f3c0e63a..6a789f26d 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -4,31 +4,77 @@ import ( "fmt" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/types" ) // Mempool defines the mempool interface. +// // Updates to the mempool need to be synchronized with committing a block so // apps can reset their transient state on Commit. type Mempool interface { - Lock() - Unlock() + // CheckTx executes a new transaction against the application to determine + // its validity and whether it should be added to the mempool. + CheckTx(tx types.Tx, callback func(*abci.Response)) error - Size() int - - CheckTx(types.Tx, func(*abci.Response)) error - CheckTxWithInfo(types.Tx, func(*abci.Response), TxInfo) error + // CheckTxWithInfo performs the same operation as CheckTx, but with extra + // meta data about the tx. + // Currently this metadata is the peer who sent it, used to prevent the tx + // from being gossiped back to them. + CheckTxWithInfo(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error + // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes + // bytes total with the condition that the total gasWanted must be less than + // maxGas. + // If both maxes are negative, there is no cap on the size of all returned + // transactions (~ all available transactions). ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs - Update(int64, types.Txs, PreCheckFunc, PostCheckFunc) error - Flush() + // ReapMaxTxs reaps up to max transactions from the mempool. + // If max is negative, there is no cap on the size of all returned + // transactions (~ all available transactions). + ReapMaxTxs(max int) types.Txs + + // UNSAFE + TxsWaitChan() <-chan struct{} + TxsFront() *clist.CElement + + // Lock locks the mempool. The consensus must be able to hold lock to safely update. + Lock() + + // Unlock unlocks the mempool. + Unlock() + + // Update informs the mempool that the given txs were committed and can be discarded. + // NOTE: this should be called *after* block is committed by consensus. + // NOTE: unsafe; Lock/Unlock must be managed by caller + Update(blockHeight int64, blockTxs types.Txs, newPreFn PreCheckFunc, newPostFn PostCheckFunc) error + + // FlushAppConn flushes the mempool connection to ensure async reqResCb calls are + // done. E.g. from CheckTx. FlushAppConn() error + // Flush removes all transactions from the mempool and cache + Flush() + + // TxsAvailable returns a channel which fires once for every height, + // and only when transactions are available in the mempool. + // NOTE: the returned channel may be nil if EnableTxsAvailable was not called. TxsAvailable() <-chan struct{} + + // EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will + // trigger once every height when transactions are available. EnableTxsAvailable() + + // Size returns the number of transactions in the mempool. + Size() int + + // TxsBytes returns the total size of all txs in the mempool. + TxsBytes() int64 } +//-------------------------------------------------------------------------------- + // PreCheckFunc is an optional filter executed before CheckTx and rejects // transaction if false is returned. An example would be to ensure that a // transaction doesn't exceeded the block size. @@ -44,7 +90,7 @@ type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error type TxInfo struct { // We don't use p2p.ID here because it's too big. The gain is to store max 2 // bytes with each tx to identify the sender rather than 20 bytes. - PeerID uint16 + SenderID uint16 } //-------------------------------------------------------------------------------- diff --git a/mempool/mock/mempool.go b/mempool/mock/mempool.go index 97cc79e6a..0a94dde06 100644 --- a/mempool/mock/mempool.go +++ b/mempool/mock/mempool.go @@ -2,6 +2,7 @@ package mock import ( abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/clist" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/types" ) @@ -22,6 +23,7 @@ func (Mempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response), return nil } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } +func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } func (Mempool) Update( _ int64, _ types.Txs, @@ -34,3 +36,7 @@ func (Mempool) Flush() {} func (Mempool) FlushAppConn() error { return nil } func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } func (Mempool) EnableTxsAvailable() {} +func (Mempool) TxsBytes() int64 { return 0 } + +func (Mempool) TxsFront() *clist.CElement { return nil } +func (Mempool) TxsWaitChan() <-chan struct{} { return nil } diff --git a/mempool/reactor.go b/mempool/reactor.go index d7581ae68..65631b0cc 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -31,25 +31,13 @@ const ( maxActiveIDs = math.MaxUint16 ) -// MempoolWithWait extends the standard Mempool interface to allow reactor to -// wait for transactions and iterate on them once there are any. Also it -// includes ReapMaxTxs function, which is useful for testing. -// -// UNSTABLE -type MempoolWithWait interface { - Mempool - TxsFront() *clist.CElement - TxsWaitChan() <-chan struct{} - ReapMaxTxs(n int) types.Txs -} - -// MempoolReactor handles mempool tx broadcasting amongst peers. +// Reactor handles mempool tx broadcasting amongst peers. // It maintains a map from peer ID to counter, to prevent gossiping txs to the // peers you received it from. -type MempoolReactor struct { +type Reactor struct { p2p.BaseReactor config *cfg.MempoolConfig - Mempool MempoolWithWait + mempool Mempool ids *mempoolIDs } @@ -116,14 +104,14 @@ func newMempoolIDs() *mempoolIDs { } } -// NewMempoolReactor returns a new MempoolReactor with the given config and mempool. -func NewMempoolReactor(config *cfg.MempoolConfig, mempool MempoolWithWait) *MempoolReactor { - memR := &MempoolReactor{ +// NewReactor returns a new Reactor with the given config and mempool. +func NewReactor(config *cfg.MempoolConfig, mempool Mempool) *Reactor { + memR := &Reactor{ config: config, - Mempool: mempool, + mempool: mempool, ids: newMempoolIDs(), } - memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR) + memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR) return memR } @@ -131,18 +119,18 @@ type mempoolWithLogger interface { SetLogger(log.Logger) } -// SetLogger sets the Logger on the reactor and the underlying Mempool. -func (memR *MempoolReactor) SetLogger(l log.Logger) { +// SetLogger sets the Logger on the reactor and the underlying mempool. +func (memR *Reactor) SetLogger(l log.Logger) { memR.Logger = l // set mempoolWithLogger if mempool supports it - if mem, ok := memR.Mempool.(mempoolWithLogger); ok { + if mem, ok := memR.mempool.(mempoolWithLogger); ok { mem.SetLogger(l) } } // OnStart implements p2p.BaseReactor. -func (memR *MempoolReactor) OnStart() error { +func (memR *Reactor) OnStart() error { if !memR.config.Broadcast { memR.Logger.Info("Tx broadcasting is disabled") } @@ -151,7 +139,7 @@ func (memR *MempoolReactor) OnStart() error { // GetChannels implements Reactor. // It returns the list of channels for this reactor. -func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { +func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ { ID: MempoolChannel, @@ -162,20 +150,20 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. -func (memR *MempoolReactor) AddPeer(peer p2p.Peer) { +func (memR *Reactor) AddPeer(peer p2p.Peer) { memR.ids.ReserveForPeer(peer) go memR.broadcastTxRoutine(peer) } // RemovePeer implements Reactor. -func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { +func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { memR.ids.Reclaim(peer) // broadcast routine checks if peer is gone and returns } // Receive implements Reactor. // It adds any received transactions to the mempool. -func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { +func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { msg, err := decodeMsg(msgBytes) if err != nil { memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) @@ -187,7 +175,7 @@ func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { switch msg := msg.(type) { case *TxMessage: peerID := memR.ids.GetForPeer(src) - err := memR.Mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{PeerID: peerID}) + err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{SenderID: peerID}) if err != nil { memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) } @@ -203,7 +191,7 @@ type PeerState interface { } // Send new mempool txs to peer. -func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { +func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { if !memR.config.Broadcast { return } @@ -220,8 +208,8 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { // start from the beginning. if next == nil { select { - case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available - if next = memR.Mempool.TxsFront(); next == nil { + case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available + if next = memR.mempool.TxsFront(); next == nil { continue } case <-peer.Quit(): @@ -275,7 +263,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { //----------------------------------------------------------------------------- // Messages -// MempoolMessage is a message sent or received by the MempoolReactor. +// MempoolMessage is a message sent or received by the Reactor. type MempoolMessage interface{} func RegisterMempoolMessages(cdc *amino.Codec) { diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index c9cf49809..40bcf83ef 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -43,8 +43,8 @@ func mempoolLogger() log.Logger { } // connect N mempool reactors through N switches -func makeAndConnectMempoolReactors(config *cfg.Config, N int) []*MempoolReactor { - reactors := make([]*MempoolReactor, N) +func makeAndConnectReactors(config *cfg.Config, N int) []*Reactor { + reactors := make([]*Reactor, N) logger := mempoolLogger() for i := 0; i < N; i++ { app := kvstore.NewKVStoreApplication() @@ -52,7 +52,7 @@ func makeAndConnectMempoolReactors(config *cfg.Config, N int) []*MempoolReactor mempool, cleanup := newMempoolWithApp(cc) defer cleanup() - reactors[i] = NewMempoolReactor(config.Mempool, mempool) // so we dont start the consensus states + reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states reactors[i].SetLogger(logger.With("validator", i)) } @@ -65,7 +65,7 @@ func makeAndConnectMempoolReactors(config *cfg.Config, N int) []*MempoolReactor } // wait for all txs on all reactors -func waitForTxs(t *testing.T, txs types.Txs, reactors []*MempoolReactor) { +func waitForTxs(t *testing.T, txs types.Txs, reactors []*Reactor) { // wait for the txs in all mempools wg := new(sync.WaitGroup) for i := 0; i < len(reactors); i++ { @@ -88,9 +88,9 @@ func waitForTxs(t *testing.T, txs types.Txs, reactors []*MempoolReactor) { } // wait for all txs on a single mempool -func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int, reactors []*MempoolReactor) { +func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int, reactors []*Reactor) { - mempool := reactors[reactorIdx].Mempool + mempool := reactors[reactorIdx].mempool for mempool.Size() != len(txs) { time.Sleep(time.Millisecond * 100) } @@ -103,9 +103,9 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int } // ensure no txs on reactor after some timeout -func ensureNoTxs(t *testing.T, reactor *MempoolReactor, timeout time.Duration) { +func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) { time.Sleep(timeout) // wait for the txs in all mempools - assert.Zero(t, reactor.Mempool.Size()) + assert.Zero(t, reactor.mempool.Size()) } const ( @@ -116,7 +116,7 @@ const ( func TestReactorBroadcastTxMessage(t *testing.T) { config := cfg.TestConfig() const N = 4 - reactors := makeAndConnectMempoolReactors(config, N) + reactors := makeAndConnectReactors(config, N) defer func() { for _, r := range reactors { r.Stop() @@ -130,14 +130,14 @@ func TestReactorBroadcastTxMessage(t *testing.T) { // send a bunch of txs to the first reactor's mempool // and wait for them all to be received in the others - txs := checkTxs(t, reactors[0].Mempool, NUM_TXS, UnknownPeerID) + txs := checkTxs(t, reactors[0].mempool, NUM_TXS, UnknownPeerID) waitForTxs(t, txs, reactors) } func TestReactorNoBroadcastToSender(t *testing.T) { config := cfg.TestConfig() const N = 2 - reactors := makeAndConnectMempoolReactors(config, N) + reactors := makeAndConnectReactors(config, N) defer func() { for _, r := range reactors { r.Stop() @@ -146,7 +146,7 @@ func TestReactorNoBroadcastToSender(t *testing.T) { // send a bunch of txs to the first reactor's mempool, claiming it came from peer // ensure peer gets no txs - checkTxs(t, reactors[0].Mempool, NUM_TXS, 1) + checkTxs(t, reactors[0].mempool, NUM_TXS, 1) ensureNoTxs(t, reactors[1], 100*time.Millisecond) } @@ -157,7 +157,7 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { config := cfg.TestConfig() const N = 2 - reactors := makeAndConnectMempoolReactors(config, N) + reactors := makeAndConnectReactors(config, N) defer func() { for _, r := range reactors { r.Stop() @@ -180,7 +180,7 @@ func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { config := cfg.TestConfig() const N = 2 - reactors := makeAndConnectMempoolReactors(config, N) + reactors := makeAndConnectReactors(config, N) // stop reactors for _, r := range reactors { diff --git a/node/node.go b/node/node.go index 49fd17a19..6f4d64242 100644 --- a/node/node.go +++ b/node/node.go @@ -159,7 +159,7 @@ type Node struct { stateDB dbm.DB blockStore *bc.BlockStore // store the blockchain to disk bcReactor *bc.BlockchainReactor // for fast-syncing - mempoolReactor *mempl.MempoolReactor // for gossipping transactions + mempoolReactor *mempl.Reactor // for gossipping transactions mempool *mempl.CListMempool consensusState *cs.ConsensusState // latest consensus state consensusReactor *cs.ConsensusReactor // for participating in the consensus @@ -320,7 +320,7 @@ func NewNode(config *cfg.Config, csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) - // Make MempoolReactor + // Make Mempool Reactor mempool := mempl.NewCListMempool( config.Mempool, proxyApp.Mempool(), @@ -333,7 +333,7 @@ func NewNode(config *cfg.Config, if config.Mempool.WalEnabled() { mempool.InitWAL() // no need to have the mempool wal during tests } - mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool) + mempoolReactor := mempl.NewReactor(config.Mempool, mempool) mempoolReactor.SetLogger(mempoolLogger) if config.Consensus.WaitForTxs() { @@ -800,12 +800,12 @@ func (n *Node) ConsensusReactor() *cs.ConsensusReactor { return n.consensusReactor } -// MempoolReactor returns the Node's MempoolReactor. -func (n *Node) MempoolReactor() *mempl.MempoolReactor { +// MempoolReactor returns the Node's mempool reactor. +func (n *Node) MempoolReactor() *mempl.Reactor { return n.mempoolReactor } -// Mempool returns the Node's Mempool. +// Mempool returns the Node's mempool. func (n *Node) Mempool() *mempl.CListMempool { return n.mempool } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index ab2316a38..e19dbf969 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -49,14 +49,6 @@ type peers interface { Peers() p2p.IPeerSet } -// Mempool extends the standard Mempool interface to allow getting -// total txs size. ReapMaxTxs is used by UnconfirmedTxs to reap N transactions. -type Mempool interface { - mempl.Mempool - ReapMaxTxs(n int) types.Txs - TxsBytes() int64 -} - //---------------------------------------------- // These package level globals come with setters // that are expected to be called only once, on startup @@ -80,7 +72,7 @@ var ( txIndexer txindex.TxIndexer consensusReactor *consensus.ConsensusReactor eventBus *types.EventBus // thread safe - mempool Mempool + mempool mempl.Mempool logger log.Logger @@ -95,7 +87,7 @@ func SetBlockStore(bs sm.BlockStore) { blockStore = bs } -func SetMempool(mem Mempool) { +func SetMempool(mem mempl.Mempool) { mempool = mem }