mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-04 04:04:00 +00:00
mempool: do not launch broadcastTxRoutine if Broadcast is off
Refs #3479
This commit is contained in:
@@ -150,7 +150,9 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
// AddPeer implements Reactor.
|
||||
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
|
||||
func (memR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
go memR.broadcastTxRoutine(peer)
|
||||
if memR.config.Broadcast {
|
||||
go memR.broadcastTxRoutine(peer)
|
||||
}
|
||||
}
|
||||
|
||||
// RemovePeer implements Reactor.
|
||||
@@ -193,10 +195,6 @@ type PeerState interface {
|
||||
|
||||
// Send new mempool txs to peer.
|
||||
func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
if !memR.config.Broadcast {
|
||||
return
|
||||
}
|
||||
|
||||
peerID := memR.ids.GetForPeer(peer)
|
||||
var next *clist.CElement
|
||||
for {
|
||||
|
||||
@@ -20,6 +20,11 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
numTxs = 1000
|
||||
timeout = 120 * time.Second // ridiculously high because CircleCI is slow
|
||||
)
|
||||
|
||||
type peerState struct {
|
||||
height int64
|
||||
}
|
||||
@@ -28,90 +33,8 @@ func (ps peerState) GetHeight() int64 {
|
||||
return ps.height
|
||||
}
|
||||
|
||||
// mempoolLogger is a TestingLogger which uses a different
|
||||
// color for each validator ("validator" key must exist).
|
||||
func mempoolLogger() log.Logger {
|
||||
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
|
||||
for i := 0; i < len(keyvals)-1; i += 2 {
|
||||
if keyvals[i] == "validator" {
|
||||
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
|
||||
}
|
||||
}
|
||||
return term.FgBgColor{}
|
||||
})
|
||||
}
|
||||
|
||||
// connect N mempool reactors through N switches
|
||||
func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
|
||||
reactors := make([]*Reactor, n)
|
||||
logger := mempoolLogger()
|
||||
for i := 0; i < n; i++ {
|
||||
app := kvstore.NewApplication()
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
mempool, cleanup := newMempoolWithApp(cc)
|
||||
defer cleanup()
|
||||
|
||||
reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states
|
||||
reactors[i].SetLogger(logger.With("validator", i))
|
||||
}
|
||||
|
||||
p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("MEMPOOL", reactors[i])
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
return reactors
|
||||
}
|
||||
|
||||
func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) {
|
||||
// wait for the txs in all mempools
|
||||
wg := new(sync.WaitGroup)
|
||||
for i, reactor := range reactors {
|
||||
wg.Add(1)
|
||||
go func(r *Reactor, reactorIndex int) {
|
||||
defer wg.Done()
|
||||
waitForTxsOnReactor(t, txs, r, reactorIndex)
|
||||
}(reactor, i)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
timer := time.After(Timeout)
|
||||
select {
|
||||
case <-timer:
|
||||
t.Fatal("Timed out waiting for txs")
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) {
|
||||
mempool := reactor.mempool
|
||||
for mempool.Size() < len(txs) {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
|
||||
reapedTxs := mempool.ReapMaxTxs(len(txs))
|
||||
for i, tx := range txs {
|
||||
assert.Equalf(t, tx, reapedTxs[i],
|
||||
"txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i])
|
||||
}
|
||||
}
|
||||
|
||||
// ensure no txs on reactor after some timeout
|
||||
func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) {
|
||||
time.Sleep(timeout) // wait for the txs in all mempools
|
||||
assert.Zero(t, reactor.mempool.Size())
|
||||
}
|
||||
|
||||
const (
|
||||
NumTxs = 1000
|
||||
Timeout = 120 * time.Second // ridiculously high because CircleCI is slow
|
||||
)
|
||||
|
||||
// Send a bunch of txs to the first reactor's mempool and wait for them all to
|
||||
// be received in the others.
|
||||
func TestReactorBroadcastTxMessage(t *testing.T) {
|
||||
config := cfg.TestConfig()
|
||||
const N = 4
|
||||
@@ -127,12 +50,12 @@ func TestReactorBroadcastTxMessage(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// send a bunch of txs to the first reactor's mempool
|
||||
// and wait for them all to be received in the others
|
||||
txs := checkTxs(t, reactors[0].mempool, NumTxs, UnknownPeerID)
|
||||
txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID)
|
||||
waitForTxsOnReactors(t, txs, reactors)
|
||||
}
|
||||
|
||||
// Send a bunch of txs to the first reactor's mempool, claiming it came from peer
|
||||
// ensure peer gets no txs.
|
||||
func TestReactorNoBroadcastToSender(t *testing.T) {
|
||||
config := cfg.TestConfig()
|
||||
const N = 2
|
||||
@@ -143,10 +66,9 @@ func TestReactorNoBroadcastToSender(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
// send a bunch of txs to the first reactor's mempool, claiming it came from peer
|
||||
// ensure peer gets no txs
|
||||
checkTxs(t, reactors[0].mempool, NumTxs, 1)
|
||||
ensureNoTxs(t, reactors[1], 100*time.Millisecond)
|
||||
const peerID = 1
|
||||
checkTxs(t, reactors[0].mempool, numTxs, peerID)
|
||||
ensureNoTxs(t, reactors[peerID], 100*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
|
||||
@@ -241,3 +163,82 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
|
||||
reactor.AddPeer(peer)
|
||||
}
|
||||
}
|
||||
|
||||
// mempoolLogger is a TestingLogger which uses a different
|
||||
// color for each validator ("validator" key must exist).
|
||||
func mempoolLogger() log.Logger {
|
||||
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
|
||||
for i := 0; i < len(keyvals)-1; i += 2 {
|
||||
if keyvals[i] == "validator" {
|
||||
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
|
||||
}
|
||||
}
|
||||
return term.FgBgColor{}
|
||||
})
|
||||
}
|
||||
|
||||
// connect N mempool reactors through N switches
|
||||
func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
|
||||
reactors := make([]*Reactor, n)
|
||||
logger := mempoolLogger()
|
||||
for i := 0; i < n; i++ {
|
||||
app := kvstore.NewApplication()
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
mempool, cleanup := newMempoolWithApp(cc)
|
||||
defer cleanup()
|
||||
|
||||
reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states
|
||||
reactors[i].SetLogger(logger.With("validator", i))
|
||||
}
|
||||
|
||||
p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("MEMPOOL", reactors[i])
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
return reactors
|
||||
}
|
||||
|
||||
func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) {
|
||||
// wait for the txs in all mempools
|
||||
wg := new(sync.WaitGroup)
|
||||
for i, reactor := range reactors {
|
||||
wg.Add(1)
|
||||
go func(r *Reactor, reactorIndex int) {
|
||||
defer wg.Done()
|
||||
waitForTxsOnReactor(t, txs, r, reactorIndex)
|
||||
}(reactor, i)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
timer := time.After(timeout)
|
||||
select {
|
||||
case <-timer:
|
||||
t.Fatal("Timed out waiting for txs")
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) {
|
||||
mempool := reactor.mempool
|
||||
for mempool.Size() < len(txs) {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
|
||||
reapedTxs := mempool.ReapMaxTxs(len(txs))
|
||||
for i, tx := range txs {
|
||||
assert.Equalf(t, tx, reapedTxs[i],
|
||||
"txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i])
|
||||
}
|
||||
}
|
||||
|
||||
// ensure no txs on reactor after some timeout
|
||||
func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) {
|
||||
time.Sleep(timeout) // wait for the txs in all mempools
|
||||
assert.Zero(t, reactor.mempool.Size())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user