mempool: fix reactor tests (#5967)

## Description

Update the faux router to either drop channel errors or handle them based on an argument. This prevents deadlocks in tests where we try to send an error on the mempool channel but there is no reader.

Closes: #5956
This commit is contained in:
Aleksandr Bezobchuk
2021-01-25 11:59:18 -05:00
committed by GitHub
parent aecfb0ecf0
commit 9e158839f6

View File

@@ -93,7 +93,14 @@ func setup(t *testing.T, cfg *cfg.MempoolConfig, logger log.Logger, chBuf uint)
return rts
}
func simulateRouter(wg *sync.WaitGroup, primary *reactorTestSuite, suites []*reactorTestSuite, numOut int) {
func simulateRouter(
wg *sync.WaitGroup,
primary *reactorTestSuite,
suites []*reactorTestSuite,
numOut int,
dropChErr bool,
) {
wg.Add(1)
// create a mapping for efficient suite lookup by peer ID
@@ -118,6 +125,19 @@ func simulateRouter(wg *sync.WaitGroup, primary *reactorTestSuite, suites []*rea
wg.Done()
}()
go func() {
for pErr := range primary.mempoolPeerErrCh {
if dropChErr {
primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err)
} else {
primary.peerUpdatesCh <- p2p.PeerUpdate{
PeerID: pErr.PeerID,
Status: p2p.PeerStatusRemoved,
}
}
}
}()
}
func waitForTxs(t *testing.T, txs types.Txs, suites ...*reactorTestSuite) {
@@ -166,7 +186,7 @@ func TestReactorBroadcastTxs(t *testing.T) {
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numTxs*len(secondaries))
simulateRouter(wg, primary, testSuites, numTxs*len(secondaries), true)
txs := checkTxs(t, primary.reactor.mempool, numTxs, UnknownPeerID)
@@ -318,7 +338,7 @@ func TestReactor_MaxTxBytes(t *testing.T) {
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, 1)
simulateRouter(wg, primary, testSuites, 1, true)
// Broadcast a tx, which has the max size and ensure it's received by the
// second reactor.
@@ -356,10 +376,17 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
reactor := setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0)
go func() {
// drop all messages on the mempool channel
for range reactor.mempoolOutCh {
}
}()
go func() {
// drop all errors on the mempool channel
for range reactor.mempoolPeerErrCh {
}
}()
peerID, err := p2p.NewNodeID("00ffaa")
require.NoError(t, err)
@@ -410,6 +437,12 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
primary := setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0)
secondary := setup(t, config.Mempool, log.TestingLogger().With("node", 1), 0)
go func() {
// drop all errors on the mempool channel
for range primary.mempoolPeerErrCh {
}
}()
// connect peer
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,