diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index b6694c9b2..9b845a29a 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -66,6 +66,7 @@ type CListMempool struct { logger log.Logger metrics *Metrics + onNewTx func(Tx) } var _ Mempool = &CListMempool{} @@ -194,14 +195,23 @@ func (mem *CListMempool) Flush() { _ = atomic.SwapInt64(&mem.txsBytes, 0) } +// TxsFront returns the first transaction in the ordered list for peer +// goroutines to call .NextWait() on. func (mem *CListMempool) TxsFront() *clist.CElement { return mem.txs.Front() } +// TxsWaitChan returns a channel to wait on transactions. It will be closed +// once the mempool is not empty (ie. the internal `mem.txs` has at least one +// element). func (mem *CListMempool) TxsWaitChan() <-chan struct{} { return mem.txs.WaitChan() } +func (mem *CListMempool) OnNewTx(cb func(Tx)) { + mem.onNewTx = cb +} + // It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // It gets called from another goroutine. @@ -337,6 +347,7 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) { mem.txsMap.Store(txKey(memTx.tx), e) atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) + mem.onNewTx(memTx) } // Called from: diff --git a/mempool/mempool.go b/mempool/mempool.go index b04f3f68c..549b07f9d 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -4,7 +4,6 @@ import ( "fmt" abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/types" ) @@ -35,15 +34,10 @@ type Mempool interface { // transactions (~ all available transactions). ReapMaxTxs(max int) types.Txs - // TxsWaitChan returns a channel to wait on transactions. It will be closed - // once the mempool is not empty (ie. the internal `mem.txs` has at least one - // element) - TxsWaitChan() <-chan struct{} - - // TxsFront returns the first transaction in the ordered list for peer - // goroutines to call .NextWait() on. - // FIXME: leaking implementation details! - TxsFront() *clist.CElement + // OnNewTx allows one to set a callback, which will be called when a new + // transaction is added to the mempool. + // Used by Reactor to broadcast new transactions to peers. + OnNewTx(cb func(Tx)) // Lock locks the mempool. The consensus must be able to hold lock to safely update. Lock() @@ -81,6 +75,17 @@ type Mempool interface { //-------------------------------------------------------------------------------- +// Tx wraps raw transaction and adds a few methods to extract useful +// information like where transaction is coming from, at which height it was +// received, etc. +type Tx interface { + Height() int64 + HasSender(uint16) bool + Raw() types.Tx +} + +//-------------------------------------------------------------------------------- + // PreCheckFunc is an optional filter executed before CheckTx and rejects // transaction if false is returned. An example would be to ensure that a // transaction doesn't exceeded the block size. diff --git a/mempool/reactor.go b/mempool/reactor.go index 65631b0cc..f31f01768 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -10,7 +10,6 @@ import ( amino "github.com/tendermint/go-amino" cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" @@ -39,6 +38,8 @@ type Reactor struct { config *cfg.MempoolConfig mempool Mempool ids *mempoolIDs + // txs to send to peers + txs []Tx } type mempoolIDs struct { @@ -110,8 +111,14 @@ func NewReactor(config *cfg.MempoolConfig, mempool Mempool) *Reactor { config: config, mempool: mempool, ids: newMempoolIDs(), + txs: make([]Tx, 0), } memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR) + if config.Broadcast { + mempool.OnNewTx(func(tx Tx) { + memR.txs = append(memR.txs, tx) + }) + } return memR } @@ -152,7 +159,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *Reactor) AddPeer(peer p2p.Peer) { memR.ids.ReserveForPeer(peer) - go memR.broadcastTxRoutine(peer) + go memR.broadcastTxRoutine(0, peer) } // RemovePeer implements Reactor. @@ -190,76 +197,71 @@ type PeerState interface { GetHeight() int64 } -// Send new mempool txs to peer. -func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { - if !memR.config.Broadcast { - return - } - - peerID := memR.ids.GetForPeer(peer) - var next *clist.CElement - for { - // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time - if !memR.IsRunning() || !peer.IsRunning() { - return - } - // This happens because the CElement we were looking at got garbage - // collected (removed). That is, .NextWait() returned nil. Go ahead and - // start from the beginning. - if next == nil { - select { - case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available - if next = memR.mempool.TxsFront(); next == nil { - continue - } - case <-peer.Quit(): - return - case <-memR.Quit(): - return - } - } - - memTx := next.Value.(*mempoolTx) - - // make sure the peer is up to date - peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { - // Peer does not have a state yet. We set it in the consensus reactor, but - // when we add peer in Switch, the order we call reactors#AddPeer is - // different every time due to us using a map. Sometimes other reactors - // will be initialized before the consensus reactor. We should wait a few - // milliseconds and retry. - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } - if peerState.GetHeight() < memTx.Height()-1 { // Allow for a lag of 1 block - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } - - // ensure peer hasn't already sent us this tx - if _, ok := memTx.senders.Load(peerID); !ok { - // send memTx - msg := &TxMessage{Tx: memTx.tx} - success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) - if !success { - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } - } - +func (memR *Reactor) broadcastTxToPeers(tx Tx) { + for pc := range memR.txsC { select { - case <-next.NextWaitChan(): - // see the start of the for loop for nil check - next = next.Next() - case <-peer.Quit(): - return + case pc <- tx: + default: + memR.Logger.Error("Peer's queue is full. Tx won't be send", "queue_size", cap(pc), "tx", tx) + } + } +} + +func (memR *Reactor) broadcastTxRoutine(pc <-chan Tx, peer p2p.Peer) { + for { + select { + case tx := <-pc: + // Try to send the tx until success OR peer/mempool reactor is stopped. + var success bool + for !success { + if !memR.IsRunning() || !peer.IsRunning() { + return + } + + success = memR.sendTxToPeer(tx, peer) + } case <-memR.Quit(): return + case <-peer.Quit(): + return } } } +func (memR *Reactor) sendTxToPeer(tx Tx, peer p2p.Peer) (success bool) { + peerID := memR.ids.GetForPeer(peer) + + // make sure the peer is up to date + peerState, ok := peer.Get(types.PeerStateKey).(PeerState) + if !ok { + // Peer does not have a state yet. We set it in the consensus reactor, but + // when we add peer in Switch, the order we call reactors#AddPeer is + // different every time due to us using a map. Sometimes other reactors + // will be initialized before the consensus reactor. We should wait a few + // milliseconds and retry. + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + return false + } + + if peerState.GetHeight() < tx.Height()-1 { // Allow for a lag of 1 block + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + return false + } + + // ensure peer hasn't already sent us this tx + if !tx.HasSender(peerID) { + // send memTx + msg := &TxMessage{Tx: tx.Raw()} + success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) + if !success { + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + return false + } + } + + return true +} + //----------------------------------------------------------------------------- // Messages