mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-08 22:23:11 +00:00
found out this issue when trying to decouple mempool reactor with its
underlying clist implementation.
according to its comment, the test `TestReactorNoBroadcastToSender` is
intended to make sure that a peer shouldn't send tx back to its origin.
however, the current test forgot to init peer state key, thus the code
will get stuck at waiting for peer to catch up state *instead of* skipping
sending tx back:
b8d08b9ef4/mempool/reactor.go (L216-L226)
this PR fixes the issue by init peer state key.
294 lines
7.3 KiB
Go
294 lines
7.3 KiB
Go
package mempool
|
|
|
|
import (
|
|
"encoding/hex"
|
|
"errors"
|
|
"net"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/fortytw2/leaktest"
|
|
"github.com/go-kit/kit/log/term"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/tendermint/tendermint/abci/example/kvstore"
|
|
cfg "github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/p2p"
|
|
"github.com/tendermint/tendermint/p2p/mock"
|
|
memproto "github.com/tendermint/tendermint/proto/tendermint/mempool"
|
|
"github.com/tendermint/tendermint/proxy"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
const (
|
|
numTxs = 1000
|
|
timeout = 120 * time.Second // ridiculously high because CircleCI is slow
|
|
)
|
|
|
|
type peerState struct {
|
|
height int64
|
|
}
|
|
|
|
func (ps peerState) GetHeight() int64 {
|
|
return ps.height
|
|
}
|
|
|
|
// Send a bunch of txs to the first reactor's mempool and wait for them all to
|
|
// be received in the others.
|
|
func TestReactorBroadcastTxMessage(t *testing.T) {
|
|
config := cfg.TestConfig()
|
|
// if there were more than two reactors, the order of transactions could not be
|
|
// asserted in waitForTxsOnReactors (due to transactions gossiping). If we
|
|
// replace Connect2Switches (full mesh) with a func, which connects first
|
|
// reactor to others and nothing else, this test should also pass with >2 reactors.
|
|
const N = 2
|
|
reactors := makeAndConnectReactors(config, N)
|
|
defer func() {
|
|
for _, r := range reactors {
|
|
if err := r.Stop(); err != nil {
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
}()
|
|
for _, r := range reactors {
|
|
for _, peer := range r.Switch.Peers().List() {
|
|
peer.Set(types.PeerStateKey, peerState{1})
|
|
}
|
|
}
|
|
|
|
txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID)
|
|
waitForTxsOnReactors(t, txs, reactors)
|
|
}
|
|
|
|
// Send a bunch of txs to the first reactor's mempool, claiming it came from peer
|
|
// ensure peer gets no txs.
|
|
func TestReactorNoBroadcastToSender(t *testing.T) {
|
|
config := cfg.TestConfig()
|
|
const N = 2
|
|
reactors := makeAndConnectReactors(config, N)
|
|
defer func() {
|
|
for _, r := range reactors {
|
|
if err := r.Stop(); err != nil {
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
}()
|
|
for _, r := range reactors {
|
|
for _, peer := range r.Switch.Peers().List() {
|
|
peer.Set(types.PeerStateKey, peerState{1})
|
|
}
|
|
}
|
|
|
|
const peerID = 1
|
|
checkTxs(t, reactors[0].mempool, numTxs, peerID)
|
|
ensureNoTxs(t, reactors[peerID], 100*time.Millisecond)
|
|
}
|
|
|
|
func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping test in short mode.")
|
|
}
|
|
|
|
config := cfg.TestConfig()
|
|
const N = 2
|
|
reactors := makeAndConnectReactors(config, N)
|
|
defer func() {
|
|
for _, r := range reactors {
|
|
if err := r.Stop(); err != nil {
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// stop peer
|
|
sw := reactors[1].Switch
|
|
sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason"))
|
|
|
|
// check that we are not leaking any go-routines
|
|
// i.e. broadcastTxRoutine finishes when peer is stopped
|
|
leaktest.CheckTimeout(t, 10*time.Second)()
|
|
}
|
|
|
|
func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping test in short mode.")
|
|
}
|
|
|
|
config := cfg.TestConfig()
|
|
const N = 2
|
|
reactors := makeAndConnectReactors(config, N)
|
|
|
|
// stop reactors
|
|
for _, r := range reactors {
|
|
if err := r.Stop(); err != nil {
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
// check that we are not leaking any go-routines
|
|
// i.e. broadcastTxRoutine finishes when reactor is stopped
|
|
leaktest.CheckTimeout(t, 10*time.Second)()
|
|
}
|
|
|
|
func TestMempoolIDsBasic(t *testing.T) {
|
|
ids := newMempoolIDs()
|
|
|
|
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
|
|
|
|
ids.ReserveForPeer(peer)
|
|
assert.EqualValues(t, 1, ids.GetForPeer(peer))
|
|
ids.Reclaim(peer)
|
|
|
|
ids.ReserveForPeer(peer)
|
|
assert.EqualValues(t, 2, ids.GetForPeer(peer))
|
|
ids.Reclaim(peer)
|
|
}
|
|
|
|
func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
|
|
if testing.Short() {
|
|
return
|
|
}
|
|
|
|
// 0 is already reserved for UnknownPeerID
|
|
ids := newMempoolIDs()
|
|
|
|
for i := 0; i < maxActiveIDs-1; i++ {
|
|
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
|
|
ids.ReserveForPeer(peer)
|
|
}
|
|
|
|
assert.Panics(t, func() {
|
|
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
|
|
ids.ReserveForPeer(peer)
|
|
})
|
|
}
|
|
|
|
func TestDontExhaustMaxActiveIDs(t *testing.T) {
|
|
config := cfg.TestConfig()
|
|
const N = 1
|
|
reactors := makeAndConnectReactors(config, N)
|
|
defer func() {
|
|
for _, r := range reactors {
|
|
if err := r.Stop(); err != nil {
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
}()
|
|
reactor := reactors[0]
|
|
|
|
for i := 0; i < maxActiveIDs+1; i++ {
|
|
peer := mock.NewPeer(nil)
|
|
reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
|
|
reactor.AddPeer(peer)
|
|
}
|
|
}
|
|
|
|
// mempoolLogger is a TestingLogger which uses a different
|
|
// color for each validator ("validator" key must exist).
|
|
func mempoolLogger() log.Logger {
|
|
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
|
|
for i := 0; i < len(keyvals)-1; i += 2 {
|
|
if keyvals[i] == "validator" {
|
|
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
|
|
}
|
|
}
|
|
return term.FgBgColor{}
|
|
})
|
|
}
|
|
|
|
// connect N mempool reactors through N switches
|
|
func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
|
|
reactors := make([]*Reactor, n)
|
|
logger := mempoolLogger()
|
|
for i := 0; i < n; i++ {
|
|
app := kvstore.NewApplication()
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
mempool, cleanup := newMempoolWithApp(cc)
|
|
defer cleanup()
|
|
|
|
reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states
|
|
reactors[i].SetLogger(logger.With("validator", i))
|
|
}
|
|
|
|
p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch {
|
|
s.AddReactor("MEMPOOL", reactors[i])
|
|
return s
|
|
|
|
}, p2p.Connect2Switches)
|
|
return reactors
|
|
}
|
|
|
|
func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) {
|
|
// wait for the txs in all mempools
|
|
wg := new(sync.WaitGroup)
|
|
for i, reactor := range reactors {
|
|
wg.Add(1)
|
|
go func(r *Reactor, reactorIndex int) {
|
|
defer wg.Done()
|
|
waitForTxsOnReactor(t, txs, r, reactorIndex)
|
|
}(reactor, i)
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
timer := time.After(timeout)
|
|
select {
|
|
case <-timer:
|
|
t.Fatal("Timed out waiting for txs")
|
|
case <-done:
|
|
}
|
|
}
|
|
|
|
func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) {
|
|
mempool := reactor.mempool
|
|
for mempool.Size() < len(txs) {
|
|
time.Sleep(time.Millisecond * 100)
|
|
}
|
|
|
|
reapedTxs := mempool.ReapMaxTxs(len(txs))
|
|
for i, tx := range txs {
|
|
assert.Equalf(t, tx, reapedTxs[i],
|
|
"txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i])
|
|
}
|
|
}
|
|
|
|
// ensure no txs on reactor after some timeout
|
|
func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) {
|
|
time.Sleep(timeout) // wait for the txs in all mempools
|
|
assert.Zero(t, reactor.mempool.Size())
|
|
}
|
|
|
|
func TestMempoolVectors(t *testing.T) {
|
|
|
|
testCases := []struct {
|
|
testName string
|
|
tx []byte
|
|
expBytes string
|
|
}{
|
|
{"tx 1", []byte{123}, "0a030a017b"},
|
|
{"tx 2", []byte("proto encoding in mempool"), "0a1b0a1970726f746f20656e636f64696e6720696e206d656d706f6f6c"},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
tc := tc
|
|
|
|
msg := memproto.Message{
|
|
Sum: &memproto.Message_Tx{
|
|
Tx: &memproto.Tx{Tx: tc.tx},
|
|
},
|
|
}
|
|
bz, err := msg.Marshal()
|
|
require.NoError(t, err, tc.testName)
|
|
|
|
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
|
|
}
|
|
|
|
}
|