mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-06 21:36:26 +00:00
@@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/libs/clist"
|
||||
@@ -24,13 +23,6 @@ var (
|
||||
_ p2p.Wrapper = (*protomem.Message)(nil)
|
||||
)
|
||||
|
||||
// PeerManager defines the interface contract required for getting necessary
|
||||
// peer information. This should eventually be replaced with a message-oriented
|
||||
// approach utilizing the p2p stack.
|
||||
type PeerManager interface {
|
||||
GetHeight(types.NodeID) int64
|
||||
}
|
||||
|
||||
// Reactor implements a service that contains mempool of txs that are broadcasted
|
||||
// amongst peers. It maintains a map from peer ID to counter, to prevent gossiping
|
||||
// txs to the peers you received it from.
|
||||
@@ -41,11 +33,6 @@ type Reactor struct {
|
||||
mempool *CListMempool
|
||||
ids *mempool.MempoolIDs
|
||||
|
||||
// XXX: Currently, this is the only way to get information about a peer. Ideally,
|
||||
// we rely on message-oriented communication to get necessary peer data.
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5670
|
||||
peerMgr PeerManager
|
||||
|
||||
mempoolCh *p2p.Channel
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
closeCh chan struct{}
|
||||
@@ -62,7 +49,6 @@ type Reactor struct {
|
||||
func NewReactor(
|
||||
logger log.Logger,
|
||||
cfg *config.MempoolConfig,
|
||||
peerMgr PeerManager,
|
||||
mp *CListMempool,
|
||||
mempoolCh *p2p.Channel,
|
||||
peerUpdates *p2p.PeerUpdates,
|
||||
@@ -70,7 +56,6 @@ func NewReactor(
|
||||
|
||||
r := &Reactor{
|
||||
cfg: cfg,
|
||||
peerMgr: peerMgr,
|
||||
mempool: mp,
|
||||
ids: mempool.NewMempoolIDs(),
|
||||
mempoolCh: mempoolCh,
|
||||
@@ -355,15 +340,6 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
|
||||
|
||||
memTx := next.Value.(*mempoolTx)
|
||||
|
||||
if r.peerMgr != nil {
|
||||
height := r.peerMgr.GetHeight(peerID)
|
||||
if height > 0 && height < memTx.Height()-1 {
|
||||
// allow for a lag of one block
|
||||
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: Transaction batching was disabled due to:
|
||||
// https://github.com/tendermint/tendermint/issues/5796
|
||||
|
||||
|
||||
@@ -70,7 +70,6 @@ func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint)
|
||||
rts.reactors[nodeID] = NewReactor(
|
||||
rts.logger.With("nodeID", nodeID),
|
||||
config,
|
||||
rts.network.Nodes[nodeID].PeerManager,
|
||||
mempool,
|
||||
rts.mempoolChnnels[nodeID],
|
||||
rts.peerUpdates[nodeID],
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/libs/clist"
|
||||
@@ -24,13 +23,6 @@ var (
|
||||
_ p2p.Wrapper = (*protomem.Message)(nil)
|
||||
)
|
||||
|
||||
// PeerManager defines the interface contract required for getting necessary
|
||||
// peer information. This should eventually be replaced with a message-oriented
|
||||
// approach utilizing the p2p stack.
|
||||
type PeerManager interface {
|
||||
GetHeight(types.NodeID) int64
|
||||
}
|
||||
|
||||
// Reactor implements a service that contains mempool of txs that are broadcasted
|
||||
// amongst peers. It maintains a map from peer ID to counter, to prevent gossiping
|
||||
// txs to the peers you received it from.
|
||||
@@ -41,11 +33,6 @@ type Reactor struct {
|
||||
mempool *TxMempool
|
||||
ids *mempool.MempoolIDs
|
||||
|
||||
// XXX: Currently, this is the only way to get information about a peer. Ideally,
|
||||
// we rely on message-oriented communication to get necessary peer data.
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5670
|
||||
peerMgr PeerManager
|
||||
|
||||
mempoolCh *p2p.Channel
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
closeCh chan struct{}
|
||||
@@ -66,7 +53,6 @@ type Reactor struct {
|
||||
func NewReactor(
|
||||
logger log.Logger,
|
||||
cfg *config.MempoolConfig,
|
||||
peerMgr PeerManager,
|
||||
txmp *TxMempool,
|
||||
mempoolCh *p2p.Channel,
|
||||
peerUpdates *p2p.PeerUpdates,
|
||||
@@ -74,7 +60,6 @@ func NewReactor(
|
||||
|
||||
r := &Reactor{
|
||||
cfg: cfg,
|
||||
peerMgr: peerMgr,
|
||||
mempool: txmp,
|
||||
ids: mempool.NewMempoolIDs(),
|
||||
mempoolCh: mempoolCh,
|
||||
@@ -364,15 +349,6 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
|
||||
|
||||
memTx := nextGossipTx.Value.(*WrappedTx)
|
||||
|
||||
if r.peerMgr != nil {
|
||||
height := r.peerMgr.GetHeight(peerID)
|
||||
if height > 0 && height < memTx.height-1 {
|
||||
// allow for a lag of one block
|
||||
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: Transaction batching was disabled due to:
|
||||
// https://github.com/tendermint/tendermint/issues/5796
|
||||
if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok {
|
||||
|
||||
@@ -67,7 +67,6 @@ func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite {
|
||||
rts.reactors[nodeID] = NewReactor(
|
||||
rts.logger.With("nodeID", nodeID),
|
||||
cfg.Mempool,
|
||||
rts.network.Nodes[nodeID].PeerManager,
|
||||
mempool,
|
||||
rts.mempoolChannels[nodeID],
|
||||
rts.peerUpdates[nodeID],
|
||||
|
||||
@@ -1055,37 +1055,6 @@ func (m *PeerManager) retryDelay(failures uint32, persistent bool) time.Duration
|
||||
return delay
|
||||
}
|
||||
|
||||
// GetHeight returns a peer's height, as reported via SetHeight, or 0 if the
|
||||
// peer or height is unknown.
|
||||
//
|
||||
// FIXME: This is a temporary workaround to share state between the consensus
|
||||
// and mempool reactors, carried over from the legacy P2P stack. Reactors should
|
||||
// not have dependencies on each other, instead tracking this themselves.
|
||||
func (m *PeerManager) GetHeight(peerID types.NodeID) int64 {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
peer, _ := m.store.Get(peerID)
|
||||
return peer.Height
|
||||
}
|
||||
|
||||
// SetHeight stores a peer's height, making it available via GetHeight.
|
||||
//
|
||||
// FIXME: This is a temporary workaround to share state between the consensus
|
||||
// and mempool reactors, carried over from the legacy P2P stack. Reactors should
|
||||
// not have dependencies on each other, instead tracking this themselves.
|
||||
func (m *PeerManager) SetHeight(peerID types.NodeID, height int64) error {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
peer, ok := m.store.Get(peerID)
|
||||
if !ok {
|
||||
peer = m.newPeerInfo(peerID)
|
||||
}
|
||||
peer.Height = height
|
||||
return m.store.Set(peer)
|
||||
}
|
||||
|
||||
// peerStore stores information about peers. It is not thread-safe, assuming it
|
||||
// is only used by PeerManager which handles concurrency control. This allows
|
||||
// the manager to execute multiple operations atomically via its own mutex.
|
||||
|
||||
@@ -1799,39 +1799,3 @@ func TestPeerManager_Advertise_Self(t *testing.T) {
|
||||
self,
|
||||
}, peerManager.Advertise(dID, 100))
|
||||
}
|
||||
|
||||
func TestPeerManager_SetHeight_GetHeight(t *testing.T) {
|
||||
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
|
||||
b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
|
||||
|
||||
db := dbm.NewMemDB()
|
||||
peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Getting a height should default to 0, for unknown peers and
|
||||
// for known peers without height.
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
require.EqualValues(t, 0, peerManager.GetHeight(a.NodeID))
|
||||
require.EqualValues(t, 0, peerManager.GetHeight(b.NodeID))
|
||||
|
||||
// Setting a height should work for a known node.
|
||||
require.NoError(t, peerManager.SetHeight(a.NodeID, 3))
|
||||
require.EqualValues(t, 3, peerManager.GetHeight(a.NodeID))
|
||||
|
||||
// Setting a height should add an unknown node.
|
||||
require.Equal(t, []types.NodeID{a.NodeID}, peerManager.Peers())
|
||||
require.NoError(t, peerManager.SetHeight(b.NodeID, 7))
|
||||
require.EqualValues(t, 7, peerManager.GetHeight(b.NodeID))
|
||||
require.ElementsMatch(t, []types.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
|
||||
|
||||
// The heights should not be persisted.
|
||||
peerManager.Close()
|
||||
peerManager, err = p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.ElementsMatch(t, []types.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
|
||||
require.Zero(t, peerManager.GetHeight(a.NodeID))
|
||||
require.Zero(t, peerManager.GetHeight(b.NodeID))
|
||||
}
|
||||
|
||||
@@ -240,7 +240,6 @@ func createMempoolReactor(
|
||||
reactor := mempoolv0.NewReactor(
|
||||
logger,
|
||||
cfg.Mempool,
|
||||
peerManager,
|
||||
mp,
|
||||
channels[mempool.MempoolChannel],
|
||||
peerUpdates,
|
||||
@@ -266,7 +265,6 @@ func createMempoolReactor(
|
||||
reactor := mempoolv1.NewReactor(
|
||||
logger,
|
||||
cfg.Mempool,
|
||||
peerManager,
|
||||
mp,
|
||||
channels[mempool.MempoolChannel],
|
||||
peerUpdates,
|
||||
|
||||
Reference in New Issue
Block a user