diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index 9deb7aace..3014e0519 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -54,6 +54,10 @@ type Reactor struct { // goroutines. peerWG sync.WaitGroup + // observePanic is a function for observing panics that were recovered in methods on + // Reactor. observePanic is called with the recovered value. + observePanic func(interface{}) + mtx tmsync.Mutex peerRoutines map[types.NodeID]*tmsync.Closer } @@ -77,12 +81,15 @@ func NewReactor( peerUpdates: peerUpdates, closeCh: make(chan struct{}), peerRoutines: make(map[types.NodeID]*tmsync.Closer), + observePanic: defaultObservePanic, } r.BaseService = *service.NewBaseService(logger, "Mempool", r) return r } +func defaultObservePanic(r interface{}) {} + // GetChannelShims returns a map of ChannelDescriptorShim objects, where each // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding // p2p proto.Message the new p2p Channel is responsible for handling. @@ -188,6 +195,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { defer func() { if e := recover(); e != nil { + r.observePanic(e) err = fmt.Errorf("panic in processing message: %v", e) r.Logger.Error( "recovering from processing message panic", @@ -318,6 +326,7 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) r.peerWG.Done() if e := recover(); e != nil { + r.observePanic(e) r.Logger.Error( "recovering from broadcasting mempool loop", "err", e, diff --git a/internal/mempool/v1/reactor_test.go b/internal/mempool/v1/reactor_test.go new file mode 100644 index 000000000..5934d534c --- /dev/null +++ b/internal/mempool/v1/reactor_test.go @@ -0,0 +1,147 @@ +package v1 + +import ( + "os" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/abci/example/kvstore" + "github.com/tendermint/tendermint/config" + tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/internal/mempool" + "github.com/tendermint/tendermint/internal/p2p" + "github.com/tendermint/tendermint/internal/p2p/p2ptest" + "github.com/tendermint/tendermint/libs/log" + protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" + "github.com/tendermint/tendermint/types" +) + +type reactorTestSuite struct { + network *p2ptest.Network + logger log.Logger + + reactors map[types.NodeID]*Reactor + mempoolChannels map[types.NodeID]*p2p.Channel + mempools map[types.NodeID]*TxMempool + kvstores map[types.NodeID]*kvstore.Application + + peerChans map[types.NodeID]chan p2p.PeerUpdate + peerUpdates map[types.NodeID]*p2p.PeerUpdates + + nodes []types.NodeID +} + +func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite { + t.Helper() + + cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) + t.Cleanup(func() { + os.RemoveAll(cfg.RootDir) + }) + + rts := &reactorTestSuite{ + logger: log.TestingLogger().With("testCase", t.Name()), + network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}), + reactors: make(map[types.NodeID]*Reactor, numNodes), + mempoolChannels: make(map[types.NodeID]*p2p.Channel, numNodes), + mempools: make(map[types.NodeID]*TxMempool, numNodes), + kvstores: make(map[types.NodeID]*kvstore.Application, numNodes), + peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), + peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), + } + + chDesc := p2p.ChannelDescriptor{ID: byte(mempool.MempoolChannel)} + rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) + + for nodeID := range rts.network.Nodes { + rts.kvstores[nodeID] = kvstore.NewApplication() + + mempool := setup(t, 0) + rts.mempools[nodeID] = mempool + + rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) + rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) + rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID]) + + rts.reactors[nodeID] = NewReactor( + rts.logger.With("nodeID", nodeID), + cfg.Mempool, + rts.network.Nodes[nodeID].PeerManager, + mempool, + rts.mempoolChannels[nodeID], + rts.peerUpdates[nodeID], + ) + + rts.nodes = append(rts.nodes, nodeID) + + require.NoError(t, rts.reactors[nodeID].Start()) + require.True(t, rts.reactors[nodeID].IsRunning()) + } + + require.Len(t, rts.reactors, numNodes) + + t.Cleanup(func() { + for nodeID := range rts.reactors { + if rts.reactors[nodeID].IsRunning() { + require.NoError(t, rts.reactors[nodeID].Stop()) + require.False(t, rts.reactors[nodeID].IsRunning()) + } + } + }) + + return rts +} + +func (rts *reactorTestSuite) start(t *testing.T) { + t.Helper() + rts.network.Start(t) + require.Len(t, + rts.network.RandomNode().PeerManager.Peers(), + len(rts.nodes)-1, + "network does not have expected number of nodes") +} + +func TestReactorBroadcastDoesNotPanic(t *testing.T) { + numNodes := 2 + rts := setupReactors(t, numNodes, 0) + + observePanic := func(r interface{}) { + t.Fatal("panic detected in reactor") + } + + primary := rts.nodes[0] + secondary := rts.nodes[1] + primaryReactor := rts.reactors[primary] + primaryMempool := primaryReactor.mempool + secondaryReactor := rts.reactors[secondary] + + primaryReactor.observePanic = observePanic + secondaryReactor.observePanic = observePanic + + firstTx := &WrappedTx{} + primaryMempool.insertTx(firstTx) + + // run the router + rts.start(t) + + closer := tmsync.NewCloser() + primaryReactor.peerWG.Add(1) + go primaryReactor.broadcastTxRoutine(secondary, closer) + + wg := &sync.WaitGroup{} + for i := 0; i < 50; i++ { + next := &WrappedTx{} + wg.Add(1) + go func() { + defer wg.Done() + primaryMempool.insertTx(next) + }() + } + + err := primaryReactor.Stop() + require.NoError(t, err) + primaryReactor.peerWG.Wait() + wg.Wait() +}