diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index de32e9f51..9293bd16f 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -2,7 +2,6 @@ package v1 import ( "bytes" - "context" "errors" "fmt" "reflect" @@ -177,7 +176,7 @@ func (txmp *TxMempool) SizeBytes() int64 { // // NOTE: The caller must obtain a write-lock via Lock() prior to execution. func (txmp *TxMempool) FlushAppConn() error { - return txmp.proxyAppConn.FlushSync(context.Background()) + return txmp.proxyAppConn.FlushSync() } // WaitForNextTx returns a blocking channel that will be closed when the next @@ -267,12 +266,7 @@ func (txmp *TxMempool) CheckTx( return mempool.ErrTxInCache } - reqRes, err := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - if err != nil { - txmp.cache.Remove(tx) - return err - } - + reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(func(res *abci.Response) { if txmp.recheckCursor != nil { panic("recheck cursor is non-nil in CheckTx callback") @@ -521,7 +515,7 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo "rejected bad transaction", "priority", wtx.priority, "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "peer_id", txInfo.SenderNodeID, + "peer_id", txInfo.SenderP2PID, "code", checkTxRes.CheckTx.Code, "post_check_err", err, ) @@ -726,7 +720,6 @@ func (txmp *TxMempool) updateReCheckTxs() { txmp.recheckCursor = txmp.gossipIndex.Front() txmp.recheckEnd = txmp.gossipIndex.Back() - ctx := context.Background() for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() { wtx := e.Value.(*WrappedTx) @@ -734,20 +727,15 @@ func (txmp *TxMempool) updateReCheckTxs() { // Only execute CheckTx if the transaction is not marked as removed which // could happen if the transaction was evicted. if !txmp.txStore.IsTxRemoved(wtx.hash) { - _, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{ + txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ Tx: wtx.tx, Type: abci.CheckTxType_Recheck, }) - if err != nil { - // no need in retrying since the tx will be rechecked after the next block - txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err) - } + } } - if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil { - txmp.logger.Error("failed to flush transactions during rechecking", "err", err) - } + txmp.proxyAppConn.FlushAsync() } // canAddTx returns an error if we cannot insert the provided *WrappedTx into diff --git a/mempool/v1/priority_queue.go b/mempool/v1/priority_queue.go index df74a92d3..b2bf59f2b 100644 --- a/mempool/v1/priority_queue.go +++ b/mempool/v1/priority_queue.go @@ -4,7 +4,7 @@ import ( "container/heap" "sort" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" + tmsync "github.com/tendermint/tendermint/libs/sync" ) var _ heap.Interface = (*TxPriorityQueue)(nil) diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 94d0580e9..a233d6d32 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -1,385 +1,310 @@ package v1 import ( - "context" "errors" "fmt" - "runtime/debug" - "sync" + "time" - "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/internal/libs/clist" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/internal/mempool" - "github.com/tendermint/tendermint/internal/p2p" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" + tmsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" "github.com/tendermint/tendermint/types" ) -var ( - _ service.Service = (*Reactor)(nil) - _ p2p.Wrapper = (*protomem.Message)(nil) -) - -// Reactor implements a service that contains mempool of txs that are broadcasted -// amongst peers. It maintains a map from peer ID to counter, to prevent gossiping -// txs to the peers you received it from. +// Reactor handles mempool tx broadcasting amongst peers. +// It maintains a map from peer ID to counter, to prevent gossiping txs to the +// peers you received it from. type Reactor struct { - service.BaseService - - cfg *config.MempoolConfig + p2p.BaseReactor + config *cfg.MempoolConfig mempool *TxMempool - ids *mempool.MempoolIDs - - mempoolCh *p2p.Channel - peerUpdates *p2p.PeerUpdates - closeCh chan struct{} - - // peerWG is used to coordinate graceful termination of all peer broadcasting - // goroutines. - peerWG sync.WaitGroup - - // observePanic is a function for observing panics that were recovered in methods on - // Reactor. observePanic is called with the recovered value. - observePanic func(interface{}) - - mtx tmsync.Mutex - peerRoutines map[types.NodeID]*tmsync.Closer + ids *mempoolIDs } -// NewReactor returns a reference to a new reactor. -func NewReactor( - logger log.Logger, - cfg *config.MempoolConfig, - txmp *TxMempool, - mempoolCh *p2p.Channel, - peerUpdates *p2p.PeerUpdates, -) *Reactor { +type mempoolIDs struct { + mtx tmsync.RWMutex + peerMap map[p2p.ID]uint16 + nextID uint16 // assumes that a node will never have over 65536 active peers + activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter +} - r := &Reactor{ - cfg: cfg, - mempool: txmp, - ids: mempool.NewMempoolIDs(), - mempoolCh: mempoolCh, - peerUpdates: peerUpdates, - closeCh: make(chan struct{}), - peerRoutines: make(map[types.NodeID]*tmsync.Closer), - observePanic: defaultObservePanic, +// Reserve searches for the next unused ID and assigns it to the +// peer. +func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { + ids.mtx.Lock() + defer ids.mtx.Unlock() + + curID := ids.nextPeerID() + ids.peerMap[peer.ID()] = curID + ids.activeIDs[curID] = struct{}{} +} + +// nextPeerID returns the next unused peer ID to use. +// This assumes that ids's mutex is already locked. +func (ids *mempoolIDs) nextPeerID() uint16 { + if len(ids.activeIDs) == mempool.MaxActiveIDs { + panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", mempool.MaxActiveIDs)) } - r.BaseService = *service.NewBaseService(logger, "Mempool", r) - return r + _, idExists := ids.activeIDs[ids.nextID] + for idExists { + ids.nextID++ + _, idExists = ids.activeIDs[ids.nextID] + } + curID := ids.nextID + ids.nextID++ + return curID } -func defaultObservePanic(r interface{}) {} +// Reclaim returns the ID reserved for the peer back to unused pool. +func (ids *mempoolIDs) Reclaim(peer p2p.Peer) { + ids.mtx.Lock() + defer ids.mtx.Unlock() -// GetChannelShims returns a map of ChannelDescriptorShim objects, where each -// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding -// p2p proto.Message the new p2p Channel is responsible for handling. -// -// -// TODO: Remove once p2p refactor is complete. -// ref: https://github.com/tendermint/tendermint/issues/5670 -func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDescriptorShim { - largestTx := make([]byte, cfg.MaxTxBytes) + removedID, ok := ids.peerMap[peer.ID()] + if ok { + delete(ids.activeIDs, removedID) + delete(ids.peerMap, peer.ID()) + } +} + +// GetForPeer returns an ID reserved for the peer. +func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { + ids.mtx.RLock() + defer ids.mtx.RUnlock() + + return ids.peerMap[peer.ID()] +} + +func newMempoolIDs() *mempoolIDs { + return &mempoolIDs{ + peerMap: make(map[p2p.ID]uint16), + activeIDs: map[uint16]struct{}{0: {}}, + nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx + } +} + +// NewReactor returns a new Reactor with the given config and mempool. +func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool) *Reactor { + memR := &Reactor{ + config: config, + mempool: mempool, + ids: newMempoolIDs(), + } + memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) + return memR +} + +// InitPeer implements Reactor by creating a state for the peer. +func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { + memR.ids.ReserveForPeer(peer) + return peer +} + +// SetLogger sets the Logger on the reactor and the underlying mempool. +func (memR *Reactor) SetLogger(l log.Logger) { + memR.Logger = l +} + +// OnStart implements p2p.BaseReactor. +func (memR *Reactor) OnStart() error { + if !memR.config.Broadcast { + memR.Logger.Info("Tx broadcasting is disabled") + } + return nil +} + +// GetChannels implements Reactor by returning the list of channels for this +// reactor. +func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { + largestTx := make([]byte, memR.config.MaxTxBytes) batchMsg := protomem.Message{ Sum: &protomem.Message_Txs{ Txs: &protomem.Txs{Txs: [][]byte{largestTx}}, }, } - return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ - mempool.MempoolChannel: { - MsgType: new(protomem.Message), - Descriptor: &p2p.ChannelDescriptor{ - ID: byte(mempool.MempoolChannel), - Priority: 5, - RecvMessageCapacity: batchMsg.Size(), - RecvBufferCapacity: 128, - MaxSendBytes: 5000, - }, + return []*p2p.ChannelDescriptor{ + { + ID: mempool.MempoolChannel, + Priority: 5, + RecvMessageCapacity: batchMsg.Size(), }, } } -// OnStart starts separate go routines for each p2p Channel and listens for -// envelopes on each. In addition, it also listens for peer updates and handles -// messages on that p2p channel accordingly. The caller must be sure to execute -// OnStop to ensure the outbound p2p Channels are closed. -func (r *Reactor) OnStart() error { - if !r.cfg.Broadcast { - r.Logger.Info("tx broadcasting is disabled") +// AddPeer implements Reactor. +// It starts a broadcast routine ensuring all txs are forwarded to the given peer. +func (memR *Reactor) AddPeer(peer p2p.Peer) { + if memR.config.Broadcast { + go memR.broadcastTxRoutine(peer) } - - go r.processMempoolCh() - go r.processPeerUpdates() - - return nil } -// OnStop stops the reactor by signaling to all spawned goroutines to exit and -// blocking until they all exit. -func (r *Reactor) OnStop() { - r.mtx.Lock() - for _, c := range r.peerRoutines { - c.Close() - } - r.mtx.Unlock() - - // wait for all spawned peer tx broadcasting goroutines to gracefully exit - r.peerWG.Wait() - - // Close closeCh to signal to all spawned goroutines to gracefully exit. All - // p2p Channels should execute Close(). - close(r.closeCh) - - // Wait for all p2p Channels to be closed before returning. This ensures we - // can easily reason about synchronization of all p2p Channels and ensure no - // panics will occur. - <-r.mempoolCh.Done() - <-r.peerUpdates.Done() +// RemovePeer implements Reactor. +func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { + memR.ids.Reclaim(peer) + // broadcast routine checks if peer is gone and returns } -// handleMempoolMessage handles envelopes sent from peers on the MempoolChannel. -// For every tx in the message, we execute CheckTx. It returns an error if an -// empty set of txs are sent in an envelope or if we receive an unexpected -// message type. -func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { - logger := r.Logger.With("peer", envelope.From) - - switch msg := envelope.Message.(type) { - case *protomem.Txs: - protoTxs := msg.GetTxs() - if len(protoTxs) == 0 { - return errors.New("empty txs received from peer") - } - - txInfo := mempool.TxInfo{SenderID: r.ids.GetForPeer(envelope.From)} - if len(envelope.From) != 0 { - txInfo.SenderNodeID = envelope.From - } - - for _, tx := range protoTxs { - if err := r.mempool.CheckTx(context.Background(), types.Tx(tx), nil, txInfo); err != nil { - logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "err", err) - } - } - - default: - return fmt.Errorf("received unknown message: %T", msg) +// Receive implements Reactor. +// It adds any received transactions to the mempool. +func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + msg, err := memR.decodeMsg(msgBytes) + if err != nil { + memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) + memR.Switch.StopPeerForError(src, err) + return } + memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) - return nil + txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)} + if src != nil { + txInfo.SenderP2PID = src.ID() + } + for _, tx := range msg.Txs { + err = memR.mempool.CheckTx(tx, nil, txInfo) + if err == mempool.ErrTxInCache { + memR.Logger.Debug("Tx already exists in cache", "tx", tx.String()) + } else if err != nil { + memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err) + } + } + // broadcasting happens from go routines per peer } -// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. -// It will handle errors and any possible panics gracefully. A caller can handle -// any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { - defer func() { - if e := recover(); e != nil { - r.observePanic(e) - err = fmt.Errorf("panic in processing message: %v", e) - r.Logger.Error( - "recovering from processing message panic", - "err", err, - "stack", string(debug.Stack()), - ) - } - }() - - r.Logger.Debug("received message", "peer", envelope.From) - - switch chID { - case mempool.MempoolChannel: - err = r.handleMempoolMessage(envelope) - - default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", chID, envelope.Message) - } - - return err +// PeerState describes the state of a peer. +type PeerState interface { + GetHeight() int64 } -// processMempoolCh implements a blocking event loop where we listen for p2p -// Envelope messages from the mempoolCh. -func (r *Reactor) processMempoolCh() { - defer r.mempoolCh.Close() +// Send new mempool txs to peer. +func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { + peerID := memR.ids.GetForPeer(peer) + var next *clist.CElement for { - select { - case envelope := <-r.mempoolCh.In: - if err := r.handleMessage(r.mempoolCh.ID, envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err) - r.mempoolCh.Error <- p2p.PeerError{ - NodeID: envelope.From, - Err: err, - } - } - - case <-r.closeCh: - r.Logger.Debug("stopped listening on mempool channel; closing...") - return - } - } -} - -// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we -// check if the reactor is running and if we've already started a tx broadcasting -// goroutine or not. If not, we start one for the newly added peer. For down or -// removed peers, we remove the peer from the mempool peer ID set and signal to -// stop the tx broadcasting goroutine. -func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { - r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) - - r.mtx.Lock() - defer r.mtx.Unlock() - - switch peerUpdate.Status { - case p2p.PeerStatusUp: - // Do not allow starting new tx broadcast loops after reactor shutdown - // has been initiated. This can happen after we've manually closed all - // peer broadcast loops and closed r.closeCh, but the router still sends - // in-flight peer updates. - if !r.IsRunning() { - return - } - - if r.cfg.Broadcast { - // Check if we've already started a goroutine for this peer, if not we create - // a new done channel so we can explicitly close the goroutine if the peer - // is later removed, we increment the waitgroup so the reactor can stop - // safely, and finally start the goroutine to broadcast txs to that peer. - _, ok := r.peerRoutines[peerUpdate.NodeID] - if !ok { - closer := tmsync.NewCloser() - - r.peerRoutines[peerUpdate.NodeID] = closer - r.peerWG.Add(1) - - r.ids.ReserveForPeer(peerUpdate.NodeID) - - // start a broadcast routine ensuring all txs are forwarded to the peer - go r.broadcastTxRoutine(peerUpdate.NodeID, closer) - } - } - - case p2p.PeerStatusDown: - r.ids.Reclaim(peerUpdate.NodeID) - - // Check if we've started a tx broadcasting goroutine for this peer. - // If we have, we signal to terminate the goroutine via the channel's closure. - // This will internally decrement the peer waitgroup and remove the peer - // from the map of peer tx broadcasting goroutines. - closer, ok := r.peerRoutines[peerUpdate.NodeID] - if ok { - closer.Close() - } - } -} - -// processPeerUpdates initiates a blocking process where we listen for and handle -// PeerUpdate messages. When the reactor is stopped, we will catch the signal and -// close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates() { - defer r.peerUpdates.Close() - - for { - select { - case peerUpdate := <-r.peerUpdates.Updates(): - r.processPeerUpdate(peerUpdate) - - case <-r.closeCh: - r.Logger.Debug("stopped listening on peer updates channel; closing...") - return - } - } -} - -func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) { - peerMempoolID := r.ids.GetForPeer(peerID) - var nextGossipTx *clist.CElement - - // remove the peer ID from the map of routines and mark the waitgroup as done - defer func() { - r.mtx.Lock() - delete(r.peerRoutines, peerID) - r.mtx.Unlock() - - r.peerWG.Done() - - if e := recover(); e != nil { - r.observePanic(e) - r.Logger.Error( - "recovering from broadcasting mempool loop", - "err", e, - "stack", string(debug.Stack()), - ) - } - }() - - for { - if !r.IsRunning() { + // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time + if !memR.IsRunning() || !peer.IsRunning() { return } // This happens because the CElement we were looking at got garbage // collected (removed). That is, .NextWait() returned nil. Go ahead and // start from the beginning. - if nextGossipTx == nil { + if next == nil { select { - case <-r.mempool.WaitForNextTx(): // wait until a tx is available - if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil { + case <-memR.mempool.WaitForNextTx(): // Wait until a tx is available + if next = memR.mempool.NextGossipTx(); next == nil { continue } - case <-closer.Done(): - // The peer is marked for removal via a PeerUpdate as the doneCh was - // explicitly closed to signal we should exit. + case <-peer.Quit(): return - case <-r.closeCh: - // The reactor has signaled that we are stopped and thus we should - // implicitly exit this peer's goroutine. + case <-memR.Quit(): return } } - memTx := nextGossipTx.Value.(*WrappedTx) + // Make sure the peer is up to date. + peerState, ok := peer.Get(types.PeerStateKey).(PeerState) + if !ok { + // Peer does not have a state yet. We set it in the consensus reactor, but + // when we add peer in Switch, the order we call reactors#AddPeer is + // different every time due to us using a map. Sometimes other reactors + // will be initialized before the consensus reactor. We should wait a few + // milliseconds and retry. + time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) + continue + } - // NOTE: Transaction batching was disabled due to: + // Allow for a lag of 1 block. + memTx := next.Value.(*WrappedTx) + if peerState.GetHeight() < memTx.height-1 { + time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) + continue + } + + // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 - if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok { - // Send the mempool tx to the corresponding peer. Note, the peer may be - // behind and thus would not be able to process the mempool tx correctly. - r.mempoolCh.Out <- p2p.Envelope{ - To: peerID, - Message: &protomem.Txs{ - Txs: [][]byte{memTx.tx}, + if ok := memR.mempool.txStore.TxHasPeer(memTx.hash, peerID); !ok { + msg := protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, }, } - r.Logger.Debug( - "gossiped tx to peer", - "tx", fmt.Sprintf("%X", memTx.tx.Hash()), - "peer", peerID, - ) + + bz, err := msg.Marshal() + if err != nil { + panic(err) + } + + success := peer.Send(mempool.MempoolChannel, bz) + if !success { + time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) + continue + } } select { - case <-nextGossipTx.NextWaitChan(): - nextGossipTx = nextGossipTx.Next() + case <-next.NextWaitChan(): + // see the start of the for loop for nil check + next = next.Next() - case <-closer.Done(): - // The peer is marked for removal via a PeerUpdate as the doneCh was - // explicitly closed to signal we should exit. + case <-peer.Quit(): return - case <-r.closeCh: - // The reactor has signaled that we are stopped and thus we should - // implicitly exit this peer's goroutine. + case <-memR.Quit(): return } } } + +//----------------------------------------------------------------------------- +// Messages + +func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) { + msg := protomem.Message{} + err := msg.Unmarshal(bz) + if err != nil { + return TxsMessage{}, err + } + + var message TxsMessage + + if i, ok := msg.Sum.(*protomem.Message_Txs); ok { + txs := i.Txs.GetTxs() + + if len(txs) == 0 { + return message, errors.New("empty TxsMessage") + } + + decoded := make([]types.Tx, len(txs)) + for j, tx := range txs { + decoded[j] = types.Tx(tx) + } + + message = TxsMessage{ + Txs: decoded, + } + return message, nil + } + return message, fmt.Errorf("msg type: %T is not supported", msg) +} + +//------------------------------------- + +// TxsMessage is a Message containing transactions. +type TxsMessage struct { + Txs []types.Tx +} + +// String returns a string representation of the TxsMessage. +func (m *TxsMessage) String() string { + return fmt.Sprintf("[TxsMessage %v]", m.Txs) +} diff --git a/mempool/v1/tx.go b/mempool/v1/tx.go index c5b7ca82f..b3b920d2e 100644 --- a/mempool/v1/tx.go +++ b/mempool/v1/tx.go @@ -4,8 +4,8 @@ import ( "sort" "time" - "github.com/tendermint/tendermint/internal/libs/clist" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/libs/clist" + tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/types" )