mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-05 03:20:44 +00:00
This is (#8446) pulled from the `main/libp2p` branch but without any of the libp2p content, and is perhaps the easiest first step to enable pluggability at the peer layer, and makes it possible hoist shims (including for, say 0.34) into tendermint without touching the reactors.
422 lines
10 KiB
Go
422 lines
10 KiB
Go
package mempool
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/fortytw2/leaktest"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
abciclient "github.com/tendermint/tendermint/abci/client"
|
|
"github.com/tendermint/tendermint/abci/example/kvstore"
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/internal/p2p"
|
|
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
tmrand "github.com/tendermint/tendermint/libs/rand"
|
|
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
type reactorTestSuite struct {
|
|
network *p2ptest.Network
|
|
logger log.Logger
|
|
|
|
reactors map[types.NodeID]*Reactor
|
|
mempoolChannels map[types.NodeID]p2p.Channel
|
|
mempools map[types.NodeID]*TxMempool
|
|
kvstores map[types.NodeID]*kvstore.Application
|
|
|
|
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
|
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
|
|
|
nodes []types.NodeID
|
|
}
|
|
|
|
func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNodes int, chBuf uint) *reactorTestSuite {
|
|
t.Helper()
|
|
|
|
cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { os.RemoveAll(cfg.RootDir) })
|
|
|
|
rts := &reactorTestSuite{
|
|
logger: log.NewNopLogger().With("testCase", t.Name()),
|
|
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
|
|
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
|
mempoolChannels: make(map[types.NodeID]p2p.Channel, numNodes),
|
|
mempools: make(map[types.NodeID]*TxMempool, numNodes),
|
|
kvstores: make(map[types.NodeID]*kvstore.Application, numNodes),
|
|
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
|
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
|
}
|
|
|
|
chDesc := getChannelDescriptor(cfg.Mempool)
|
|
rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc)
|
|
|
|
for nodeID := range rts.network.Nodes {
|
|
rts.kvstores[nodeID] = kvstore.NewApplication()
|
|
|
|
client := abciclient.NewLocalClient(logger, rts.kvstores[nodeID])
|
|
require.NoError(t, client.Start(ctx))
|
|
t.Cleanup(client.Wait)
|
|
|
|
mempool := setup(t, client, 0)
|
|
rts.mempools[nodeID] = mempool
|
|
|
|
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)
|
|
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
|
|
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
|
|
|
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
|
return rts.mempoolChannels[nodeID], nil
|
|
}
|
|
|
|
rts.reactors[nodeID] = NewReactor(
|
|
rts.logger.With("nodeID", nodeID),
|
|
cfg.Mempool,
|
|
mempool,
|
|
chCreator,
|
|
func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
|
|
)
|
|
rts.nodes = append(rts.nodes, nodeID)
|
|
|
|
require.NoError(t, rts.reactors[nodeID].Start(ctx))
|
|
require.True(t, rts.reactors[nodeID].IsRunning())
|
|
}
|
|
|
|
require.Len(t, rts.reactors, numNodes)
|
|
|
|
t.Cleanup(func() {
|
|
for nodeID := range rts.reactors {
|
|
if rts.reactors[nodeID].IsRunning() {
|
|
rts.reactors[nodeID].Stop()
|
|
rts.reactors[nodeID].Wait()
|
|
require.False(t, rts.reactors[nodeID].IsRunning())
|
|
}
|
|
|
|
}
|
|
})
|
|
|
|
t.Cleanup(leaktest.Check(t))
|
|
|
|
return rts
|
|
}
|
|
|
|
func (rts *reactorTestSuite) start(ctx context.Context, t *testing.T) {
|
|
t.Helper()
|
|
rts.network.Start(ctx, t)
|
|
|
|
require.Len(t,
|
|
rts.network.RandomNode().PeerManager.Peers(),
|
|
len(rts.nodes)-1,
|
|
"network does not have expected number of nodes")
|
|
}
|
|
|
|
func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs []types.Tx, ids ...types.NodeID) {
|
|
t.Helper()
|
|
|
|
// ensure that the transactions get fully broadcast to the
|
|
// rest of the network
|
|
wg := &sync.WaitGroup{}
|
|
for name, pool := range rts.mempools {
|
|
if !p2ptest.NodeInSlice(name, ids) {
|
|
continue
|
|
}
|
|
if len(txs) == pool.Size() {
|
|
continue
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(name types.NodeID, pool *TxMempool) {
|
|
defer wg.Done()
|
|
require.Eventually(t, func() bool { return len(txs) == pool.Size() },
|
|
time.Minute,
|
|
250*time.Millisecond,
|
|
"node=%q, ntx=%d, size=%d", name, len(txs), pool.Size(),
|
|
)
|
|
}(name, pool)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestReactorBroadcastDoesNotPanic(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
const numNodes = 2
|
|
|
|
logger := log.NewNopLogger()
|
|
rts := setupReactors(ctx, t, logger, numNodes, 0)
|
|
|
|
observePanic := func(r interface{}) {
|
|
t.Fatal("panic detected in reactor")
|
|
}
|
|
|
|
primary := rts.nodes[0]
|
|
secondary := rts.nodes[1]
|
|
primaryReactor := rts.reactors[primary]
|
|
primaryMempool := primaryReactor.mempool
|
|
secondaryReactor := rts.reactors[secondary]
|
|
|
|
primaryReactor.observePanic = observePanic
|
|
secondaryReactor.observePanic = observePanic
|
|
|
|
firstTx := &WrappedTx{}
|
|
primaryMempool.insertTx(firstTx)
|
|
|
|
// run the router
|
|
rts.start(ctx, t)
|
|
|
|
go primaryReactor.broadcastTxRoutine(ctx, secondary, rts.mempoolChannels[primary])
|
|
|
|
wg := &sync.WaitGroup{}
|
|
for i := 0; i < 50; i++ {
|
|
next := &WrappedTx{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
primaryMempool.insertTx(next)
|
|
}()
|
|
}
|
|
|
|
primaryReactor.Stop()
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestReactorBroadcastTxs(t *testing.T) {
|
|
numTxs := 512
|
|
numNodes := 4
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
logger := log.NewNopLogger()
|
|
|
|
rts := setupReactors(ctx, t, logger, numNodes, uint(numTxs))
|
|
|
|
primary := rts.nodes[0]
|
|
secondaries := rts.nodes[1:]
|
|
|
|
txs := checkTxs(ctx, t, rts.reactors[primary].mempool, numTxs, UnknownPeerID)
|
|
|
|
require.Equal(t, numTxs, rts.reactors[primary].mempool.Size())
|
|
|
|
rts.start(ctx, t)
|
|
|
|
// Wait till all secondary suites (reactor) received all mempool txs from the
|
|
// primary suite (node).
|
|
rts.waitForTxns(t, convertTex(txs), secondaries...)
|
|
}
|
|
|
|
// regression test for https://github.com/tendermint/tendermint/issues/5408
|
|
func TestReactorConcurrency(t *testing.T) {
|
|
numTxs := 10
|
|
numNodes := 2
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
logger := log.NewNopLogger()
|
|
rts := setupReactors(ctx, t, logger, numNodes, 0)
|
|
|
|
primary := rts.nodes[0]
|
|
secondary := rts.nodes[1]
|
|
|
|
rts.start(ctx, t)
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for i := 0; i < runtime.NumCPU()*2; i++ {
|
|
wg.Add(2)
|
|
|
|
// 1. submit a bunch of txs
|
|
// 2. update the whole mempool
|
|
|
|
txs := checkTxs(ctx, t, rts.reactors[primary].mempool, numTxs, UnknownPeerID)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
mempool := rts.mempools[primary]
|
|
|
|
mempool.Lock()
|
|
defer mempool.Unlock()
|
|
|
|
deliverTxResponses := make([]*abci.ExecTxResult, len(txs))
|
|
for i := range txs {
|
|
deliverTxResponses[i] = &abci.ExecTxResult{Code: 0}
|
|
}
|
|
|
|
require.NoError(t, mempool.Update(ctx, 1, convertTex(txs), deliverTxResponses, nil, nil, true))
|
|
}()
|
|
|
|
// 1. submit a bunch of txs
|
|
// 2. update none
|
|
_ = checkTxs(ctx, t, rts.reactors[secondary].mempool, numTxs, UnknownPeerID)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
mempool := rts.mempools[secondary]
|
|
|
|
mempool.Lock()
|
|
defer mempool.Unlock()
|
|
|
|
err := mempool.Update(ctx, 1, []types.Tx{}, make([]*abci.ExecTxResult, 0), nil, nil, true)
|
|
require.NoError(t, err)
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestReactorNoBroadcastToSender(t *testing.T) {
|
|
numTxs := 1000
|
|
numNodes := 2
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
logger := log.NewNopLogger()
|
|
rts := setupReactors(ctx, t, logger, numNodes, uint(numTxs))
|
|
|
|
primary := rts.nodes[0]
|
|
secondary := rts.nodes[1]
|
|
|
|
peerID := uint16(1)
|
|
_ = checkTxs(ctx, t, rts.mempools[primary], numTxs, peerID)
|
|
|
|
rts.start(ctx, t)
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
require.Eventually(t, func() bool {
|
|
return rts.mempools[secondary].Size() == 0
|
|
}, time.Minute, 100*time.Millisecond)
|
|
}
|
|
|
|
func TestReactor_MaxTxBytes(t *testing.T) {
|
|
numNodes := 2
|
|
cfg := config.TestConfig()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
logger := log.NewNopLogger()
|
|
|
|
rts := setupReactors(ctx, t, logger, numNodes, 0)
|
|
|
|
primary := rts.nodes[0]
|
|
secondary := rts.nodes[1]
|
|
|
|
// Broadcast a tx, which has the max size and ensure it's received by the
|
|
// second reactor.
|
|
tx1 := tmrand.Bytes(cfg.Mempool.MaxTxBytes)
|
|
err := rts.reactors[primary].mempool.CheckTx(
|
|
ctx,
|
|
tx1,
|
|
nil,
|
|
TxInfo{
|
|
SenderID: UnknownPeerID,
|
|
},
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
rts.start(ctx, t)
|
|
|
|
rts.reactors[primary].mempool.Flush()
|
|
rts.reactors[secondary].mempool.Flush()
|
|
|
|
// broadcast a tx, which is beyond the max size and ensure it's not sent
|
|
tx2 := tmrand.Bytes(cfg.Mempool.MaxTxBytes + 1)
|
|
err = rts.mempools[primary].CheckTx(ctx, tx2, nil, TxInfo{SenderID: UnknownPeerID})
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestDontExhaustMaxActiveIDs(t *testing.T) {
|
|
// we're creating a single node network, but not starting the
|
|
// network.
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
logger := log.NewNopLogger()
|
|
rts := setupReactors(ctx, t, logger, 1, MaxActiveIDs+1)
|
|
|
|
nodeID := rts.nodes[0]
|
|
|
|
peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
|
|
require.NoError(t, err)
|
|
|
|
// ensure the reactor does not panic (i.e. exhaust active IDs)
|
|
for i := 0; i < MaxActiveIDs+1; i++ {
|
|
rts.peerChans[nodeID] <- p2p.PeerUpdate{
|
|
Status: p2p.PeerStatusUp,
|
|
NodeID: peerID,
|
|
}
|
|
|
|
require.NoError(t, rts.mempoolChannels[nodeID].Send(ctx, p2p.Envelope{
|
|
To: peerID,
|
|
Message: &protomem.Txs{
|
|
Txs: [][]byte{},
|
|
},
|
|
}))
|
|
}
|
|
}
|
|
|
|
func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping test in short mode")
|
|
}
|
|
|
|
// 0 is already reserved for UnknownPeerID
|
|
ids := NewMempoolIDs()
|
|
|
|
for i := 0; i < MaxActiveIDs-1; i++ {
|
|
peerID, err := types.NewNodeID(fmt.Sprintf("%040d", i))
|
|
require.NoError(t, err)
|
|
ids.ReserveForPeer(peerID)
|
|
}
|
|
|
|
peerID, err := types.NewNodeID(fmt.Sprintf("%040d", MaxActiveIDs-1))
|
|
require.NoError(t, err)
|
|
require.Panics(t, func() {
|
|
ids.ReserveForPeer(peerID)
|
|
})
|
|
}
|
|
|
|
func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping test in short mode")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
logger := log.NewNopLogger()
|
|
|
|
rts := setupReactors(ctx, t, logger, 2, 2)
|
|
|
|
primary := rts.nodes[0]
|
|
secondary := rts.nodes[1]
|
|
|
|
rts.start(ctx, t)
|
|
|
|
// disconnect peer
|
|
rts.peerChans[primary] <- p2p.PeerUpdate{
|
|
Status: p2p.PeerStatusDown,
|
|
NodeID: secondary,
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
txs := checkTxs(ctx, t, rts.reactors[primary].mempool, 4, UnknownPeerID)
|
|
require.Equal(t, 4, len(txs))
|
|
require.Equal(t, 4, rts.mempools[primary].Size())
|
|
require.Equal(t, 0, rts.mempools[secondary].Size())
|
|
}
|