mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-07 13:55:17 +00:00
p2p: remove unused get height methods (#8569)
This commit is contained in:
@@ -6,7 +6,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/tendermint/tendermint/config"
|
"github.com/tendermint/tendermint/config"
|
||||||
"github.com/tendermint/tendermint/internal/libs/clist"
|
"github.com/tendermint/tendermint/internal/libs/clist"
|
||||||
@@ -22,13 +21,6 @@ var (
|
|||||||
_ p2p.Wrapper = (*protomem.Message)(nil)
|
_ p2p.Wrapper = (*protomem.Message)(nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerManager defines the interface contract required for getting necessary
|
|
||||||
// peer information. This should eventually be replaced with a message-oriented
|
|
||||||
// approach utilizing the p2p stack.
|
|
||||||
type PeerManager interface {
|
|
||||||
GetHeight(types.NodeID) int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reactor implements a service that contains mempool of txs that are broadcasted
|
// Reactor implements a service that contains mempool of txs that are broadcasted
|
||||||
// amongst peers. It maintains a map from peer ID to counter, to prevent gossiping
|
// amongst peers. It maintains a map from peer ID to counter, to prevent gossiping
|
||||||
// txs to the peers you received it from.
|
// txs to the peers you received it from.
|
||||||
@@ -40,9 +32,8 @@ type Reactor struct {
|
|||||||
mempool *TxMempool
|
mempool *TxMempool
|
||||||
ids *IDs
|
ids *IDs
|
||||||
|
|
||||||
getPeerHeight func(types.NodeID) int64
|
peerEvents p2p.PeerEventSubscriber
|
||||||
peerEvents p2p.PeerEventSubscriber
|
chCreator p2p.ChannelCreator
|
||||||
chCreator p2p.ChannelCreator
|
|
||||||
|
|
||||||
// observePanic is a function for observing panics that were recovered in methods on
|
// observePanic is a function for observing panics that were recovered in methods on
|
||||||
// Reactor. observePanic is called with the recovered value.
|
// Reactor. observePanic is called with the recovered value.
|
||||||
@@ -59,18 +50,16 @@ func NewReactor(
|
|||||||
txmp *TxMempool,
|
txmp *TxMempool,
|
||||||
chCreator p2p.ChannelCreator,
|
chCreator p2p.ChannelCreator,
|
||||||
peerEvents p2p.PeerEventSubscriber,
|
peerEvents p2p.PeerEventSubscriber,
|
||||||
getPeerHeight func(types.NodeID) int64,
|
|
||||||
) *Reactor {
|
) *Reactor {
|
||||||
r := &Reactor{
|
r := &Reactor{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
mempool: txmp,
|
mempool: txmp,
|
||||||
ids: NewMempoolIDs(),
|
ids: NewMempoolIDs(),
|
||||||
chCreator: chCreator,
|
chCreator: chCreator,
|
||||||
peerEvents: peerEvents,
|
peerEvents: peerEvents,
|
||||||
getPeerHeight: getPeerHeight,
|
peerRoutines: make(map[types.NodeID]context.CancelFunc),
|
||||||
peerRoutines: make(map[types.NodeID]context.CancelFunc),
|
observePanic: defaultObservePanic,
|
||||||
observePanic: defaultObservePanic,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
r.BaseService = *service.NewBaseService(logger, "Mempool", r)
|
r.BaseService = *service.NewBaseService(logger, "Mempool", r)
|
||||||
@@ -327,15 +316,6 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, m
|
|||||||
|
|
||||||
memTx := nextGossipTx.Value.(*WrappedTx)
|
memTx := nextGossipTx.Value.(*WrappedTx)
|
||||||
|
|
||||||
if r.getPeerHeight != nil {
|
|
||||||
height := r.getPeerHeight(peerID)
|
|
||||||
if height > 0 && height < memTx.height-1 {
|
|
||||||
// allow for a lag of one block
|
|
||||||
time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: Transaction batching was disabled due to:
|
// NOTE: Transaction batching was disabled due to:
|
||||||
// https://github.com/tendermint/tendermint/issues/5796
|
// https://github.com/tendermint/tendermint/issues/5796
|
||||||
if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok {
|
if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok {
|
||||||
|
|||||||
@@ -85,7 +85,6 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
|
|||||||
mempool,
|
mempool,
|
||||||
chCreator,
|
chCreator,
|
||||||
func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
|
func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
|
||||||
rts.network.Nodes[nodeID].PeerManager.GetHeight,
|
|
||||||
)
|
)
|
||||||
rts.nodes = append(rts.nodes, nodeID)
|
rts.nodes = append(rts.nodes, nodeID)
|
||||||
|
|
||||||
|
|||||||
@@ -1027,37 +1027,6 @@ func (m *PeerManager) retryDelay(failures uint32, persistent bool) time.Duration
|
|||||||
return delay
|
return delay
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetHeight returns a peer's height, as reported via SetHeight, or 0 if the
|
|
||||||
// peer or height is unknown.
|
|
||||||
//
|
|
||||||
// FIXME: This is a temporary workaround to share state between the consensus
|
|
||||||
// and mempool reactors, carried over from the legacy P2P stack. Reactors should
|
|
||||||
// not have dependencies on each other, instead tracking this themselves.
|
|
||||||
func (m *PeerManager) GetHeight(peerID types.NodeID) int64 {
|
|
||||||
m.mtx.Lock()
|
|
||||||
defer m.mtx.Unlock()
|
|
||||||
|
|
||||||
peer, _ := m.store.Get(peerID)
|
|
||||||
return peer.Height
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetHeight stores a peer's height, making it available via GetHeight.
|
|
||||||
//
|
|
||||||
// FIXME: This is a temporary workaround to share state between the consensus
|
|
||||||
// and mempool reactors, carried over from the legacy P2P stack. Reactors should
|
|
||||||
// not have dependencies on each other, instead tracking this themselves.
|
|
||||||
func (m *PeerManager) SetHeight(peerID types.NodeID, height int64) error {
|
|
||||||
m.mtx.Lock()
|
|
||||||
defer m.mtx.Unlock()
|
|
||||||
|
|
||||||
peer, ok := m.store.Get(peerID)
|
|
||||||
if !ok {
|
|
||||||
peer = m.newPeerInfo(peerID)
|
|
||||||
}
|
|
||||||
peer.Height = height
|
|
||||||
return m.store.Set(peer)
|
|
||||||
}
|
|
||||||
|
|
||||||
// peerStore stores information about peers. It is not thread-safe, assuming it
|
// peerStore stores information about peers. It is not thread-safe, assuming it
|
||||||
// is only used by PeerManager which handles concurrency control. This allows
|
// is only used by PeerManager which handles concurrency control. This allows
|
||||||
// the manager to execute multiple operations atomically via its own mutex.
|
// the manager to execute multiple operations atomically via its own mutex.
|
||||||
|
|||||||
@@ -1868,38 +1868,3 @@ func TestPeerManager_Advertise_Self(t *testing.T) {
|
|||||||
self,
|
self,
|
||||||
}, peerManager.Advertise(dID, 100))
|
}, peerManager.Advertise(dID, 100))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPeerManager_SetHeight_GetHeight(t *testing.T) {
|
|
||||||
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
|
|
||||||
b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
|
|
||||||
|
|
||||||
db := dbm.NewMemDB()
|
|
||||||
peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Getting a height should default to 0, for unknown peers and
|
|
||||||
// for known peers without height.
|
|
||||||
added, err := peerManager.Add(a)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, added)
|
|
||||||
require.EqualValues(t, 0, peerManager.GetHeight(a.NodeID))
|
|
||||||
require.EqualValues(t, 0, peerManager.GetHeight(b.NodeID))
|
|
||||||
|
|
||||||
// Setting a height should work for a known node.
|
|
||||||
require.NoError(t, peerManager.SetHeight(a.NodeID, 3))
|
|
||||||
require.EqualValues(t, 3, peerManager.GetHeight(a.NodeID))
|
|
||||||
|
|
||||||
// Setting a height should add an unknown node.
|
|
||||||
require.Equal(t, []types.NodeID{a.NodeID}, peerManager.Peers())
|
|
||||||
require.NoError(t, peerManager.SetHeight(b.NodeID, 7))
|
|
||||||
require.EqualValues(t, 7, peerManager.GetHeight(b.NodeID))
|
|
||||||
require.ElementsMatch(t, []types.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
|
|
||||||
|
|
||||||
// The heights should not be persisted.
|
|
||||||
peerManager, err = p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.ElementsMatch(t, []types.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
|
|
||||||
require.Zero(t, peerManager.GetHeight(a.NodeID))
|
|
||||||
require.Zero(t, peerManager.GetHeight(b.NodeID))
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -266,7 +266,7 @@ func makeNode(
|
|||||||
node.evPool = evPool
|
node.evPool = evPool
|
||||||
|
|
||||||
mpReactor, mp := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool,
|
mpReactor, mp := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool,
|
||||||
peerManager.Subscribe, node.router.OpenChannel, peerManager.GetHeight)
|
peerManager.Subscribe, node.router.OpenChannel)
|
||||||
node.rpcEnv.Mempool = mp
|
node.rpcEnv.Mempool = mp
|
||||||
node.services = append(node.services, mpReactor)
|
node.services = append(node.services, mpReactor)
|
||||||
|
|
||||||
|
|||||||
@@ -147,7 +147,6 @@ func createMempoolReactor(
|
|||||||
memplMetrics *mempool.Metrics,
|
memplMetrics *mempool.Metrics,
|
||||||
peerEvents p2p.PeerEventSubscriber,
|
peerEvents p2p.PeerEventSubscriber,
|
||||||
chCreator p2p.ChannelCreator,
|
chCreator p2p.ChannelCreator,
|
||||||
peerHeight func(types.NodeID) int64,
|
|
||||||
) (service.Service, mempool.Mempool) {
|
) (service.Service, mempool.Mempool) {
|
||||||
logger = logger.With("module", "mempool")
|
logger = logger.With("module", "mempool")
|
||||||
|
|
||||||
@@ -166,7 +165,6 @@ func createMempoolReactor(
|
|||||||
mp,
|
mp,
|
||||||
chCreator,
|
chCreator,
|
||||||
peerEvents,
|
peerEvents,
|
||||||
peerHeight,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if cfg.Consensus.WaitForTxs() {
|
if cfg.Consensus.WaitForTxs() {
|
||||||
|
|||||||
Reference in New Issue
Block a user