diff --git a/mempool/v1/reactor_test.go b/mempool/v1/reactor_test.go index 3f7c3df46..19fd003a7 100644 --- a/mempool/v1/reactor_test.go +++ b/mempool/v1/reactor_test.go @@ -1,146 +1,185 @@ package v1 -// import ( -// "os" -// "strings" -// "sync" -// "testing" +import ( + "encoding/hex" + "os" + "sync" + "testing" + "time" -// "github.com/stretchr/testify/require" -// "github.com/tendermint/tendermint/abci/example/kvstore" -// "github.com/tendermint/tendermint/config" -// "github.com/tendermint/tendermint/libs/log" -// tmsync "github.com/tendermint/tendermint/libs/sync" -// "github.com/tendermint/tendermint/mempool" -// "github.com/tendermint/tendermint/p2p" + "github.com/go-kit/log/term" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" -// // "github.com/tendermint/tendermint/p2p" -// protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" -// "github.com/tendermint/tendermint/types" -// ) + "github.com/tendermint/tendermint/abci/example/kvstore" + "github.com/tendermint/tendermint/config" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" + memproto "github.com/tendermint/tendermint/proto/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) -// type reactorTestSuite struct { -// network *p2ptest.Network -// logger log.Logger +const ( + numTxs = 1000 + timeout = 120 * time.Second // ridiculously high because CircleCI is slow +) -// reactors map[types.NodeID]*Reactor -// mempoolChannels map[types.NodeID]*p2p.Channel -// mempools map[types.NodeID]*TxMempool -// kvstores map[types.NodeID]*kvstore.Application +type peerState struct { + height int64 +} -// peerChans map[types.NodeID]chan p2p.PeerUpdate -// peerUpdates map[types.NodeID]*p2p.PeerUpdates +func (ps peerState) GetHeight() int64 { + return ps.height +} -// nodes []types.NodeID -// } +// Send a bunch of txs to the first reactor's mempool and wait for them all to +// be received in the others. +func TestReactorBroadcastTxsMessage(t *testing.T) { + config := cfg.TestConfig() + // if there were more than two reactors, the order of transactions could not be + // asserted in waitForTxsOnReactors (due to transactions gossiping). If we + // replace Connect2Switches (full mesh) with a func, which connects first + // reactor to others and nothing else, this test should also pass with >2 reactors. + const N = 2 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + }() + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + peer.Set(types.PeerStateKey, peerState{1}) + } + } -// func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite { -// t.Helper() + txs := checkTxs(t, reactors[0].mempool, numTxs, mempool.UnknownPeerID) + transactions := make(types.Txs, len(txs)) + for idx, tx := range txs { + transactions[idx] = tx.tx + } -// cfg, err := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) -// require.NoError(t, err) -// t.Cleanup(func() { os.RemoveAll(cfg.RootDir) }) + waitForTxsOnReactors(t, transactions, reactors) +} -// rts := &reactorTestSuite{ -// logger: log.TestingLogger().With("testCase", t.Name()), -// network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}), -// reactors: make(map[types.NodeID]*Reactor, numNodes), -// mempoolChannels: make(map[types.NodeID]*p2p.Channel, numNodes), -// mempools: make(map[types.NodeID]*TxMempool, numNodes), -// kvstores: make(map[types.NodeID]*kvstore.Application, numNodes), -// peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), -// peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), -// } +func TestMempoolVectors(t *testing.T) { + testCases := []struct { + testName string + tx []byte + expBytes string + }{ + {"tx 1", []byte{123}, "0a030a017b"}, + {"tx 2", []byte("proto encoding in mempool"), "0a1b0a1970726f746f20656e636f64696e6720696e206d656d706f6f6c"}, + } -// chDesc := p2p.ChannelDescriptor{ID: byte(mempool.MempoolChannel)} -// rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) + for _, tc := range testCases { + tc := tc -// for nodeID := range rts.network.Nodes { -// rts.kvstores[nodeID] = kvstore.NewApplication() + msg := memproto.Message{ + Sum: &memproto.Message_Txs{ + Txs: &memproto.Txs{Txs: [][]byte{tc.tx}}, + }, + } + bz, err := msg.Marshal() + require.NoError(t, err, tc.testName) -// mempool := setup(t, 0) -// rts.mempools[nodeID] = mempool + require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName) + } +} -// rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) -// rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) -// rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID]) +func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { + reactors := make([]*Reactor, n) + logger := mempoolLogger() + for i := 0; i < n; i++ { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() -// rts.reactors[nodeID] = NewReactor( -// rts.logger.With("nodeID", nodeID), -// cfg.Mempool, -// mempool, -// rts.mempoolChannels[nodeID], -// rts.peerUpdates[nodeID], -// ) + reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states + reactors[i].SetLogger(logger.With("validator", i)) + } -// rts.nodes = append(rts.nodes, nodeID) + p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("MEMPOOL", reactors[i]) + return s -// require.NoError(t, rts.reactors[nodeID].Start()) -// require.True(t, rts.reactors[nodeID].IsRunning()) -// } + }, p2p.Connect2Switches) + return reactors +} -// require.Len(t, rts.reactors, numNodes) +// mempoolLogger is a TestingLogger which uses a different +// color for each validator ("validator" key must exist). +func mempoolLogger() log.Logger { + return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "validator" { + return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} + } + } + return term.FgBgColor{} + }) +} -// t.Cleanup(func() { -// for nodeID := range rts.reactors { -// if rts.reactors[nodeID].IsRunning() { -// require.NoError(t, rts.reactors[nodeID].Stop()) -// require.False(t, rts.reactors[nodeID].IsRunning()) -// } -// } -// }) +func newMempoolWithApp(cc proxy.ClientCreator) (*TxMempool, func()) { + conf := config.ResetTestRoot("mempool_test") -// return rts -// } + mp, cu := newMempoolWithAppAndConfig(cc, conf) + return mp, cu +} -// func (rts *reactorTestSuite) start(t *testing.T) { -// t.Helper() -// rts.network.Start(t) -// require.Len(t, -// rts.network.RandomNode().PeerManager.Peers(), -// len(rts.nodes)-1, -// "network does not have expected number of nodes") -// } +func newMempoolWithAppAndConfig(cc proxy.ClientCreator, cfg *config.Config) (*TxMempool, func()) { + appConnMem, _ := cc.NewABCIClient() + appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) + err := appConnMem.Start() + if err != nil { + panic(err) + } -// func TestReactorBroadcastDoesNotPanic(t *testing.T) { -// numNodes := 2 -// rts := setupReactors(t, numNodes, 0) + mp := NewTxMempool(log.TestingLogger(), cfg.Mempool, appConnMem, 0) -// observePanic := func(r interface{}) { -// t.Fatal("panic detected in reactor") -// } + return mp, func() { os.RemoveAll(cfg.RootDir) } +} -// primary := rts.nodes[0] -// secondary := rts.nodes[1] -// primaryReactor := rts.reactors[primary] -// primaryMempool := primaryReactor.mempool -// secondaryReactor := rts.reactors[secondary] +func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) { + // wait for the txs in all mempools + wg := new(sync.WaitGroup) + for i, reactor := range reactors { + wg.Add(1) + go func(r *Reactor, reactorIndex int) { + defer wg.Done() + waitForTxsOnReactor(t, txs, r, reactorIndex) + }(reactor, i) + } -// primaryReactor.observePanic = observePanic -// secondaryReactor.observePanic = observePanic + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() -// firstTx := &WrappedTx{} -// primaryMempool.insertTx(firstTx) + timer := time.After(timeout) + select { + case <-timer: + t.Fatal("Timed out waiting for txs") + case <-done: + } +} -// // run the router -// rts.start(t) +func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) { + mempool := reactor.mempool + for mempool.Size() < len(txs) { + time.Sleep(time.Millisecond * 100) + } -// closer := tmsync.NewCloser() -// primaryReactor.peerWG.Add(1) -// go primaryReactor.broadcastTxRoutine(secondary, closer) - -// wg := &sync.WaitGroup{} -// for i := 0; i < 50; i++ { -// next := &WrappedTx{} -// wg.Add(1) -// go func() { -// defer wg.Done() -// primaryMempool.insertTx(next) -// }() -// } - -// err := primaryReactor.Stop() -// require.NoError(t, err) -// primaryReactor.peerWG.Wait() -// wg.Wait() -// } + reapedTxs := mempool.ReapMaxTxs(len(txs)) + for i, tx := range txs { + assert.Equalf(t, tx, reapedTxs[i], + "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i]) + } +}