Compare commits

..

1 Commits

Author SHA1 Message Date
William Banfield
15f2436c28 p2p: pqueue proposal 2022-06-29 18:19:12 -04:00
45 changed files with 1550 additions and 977 deletions

View File

@@ -27,5 +27,3 @@ Special thanks to external contributors on this release:
### IMPROVEMENTS
### BUG FIXES
- [mempool] \#8944 Fix unbounded heap growth in the priority mempool. (@creachadair)

View File

@@ -802,13 +802,13 @@ func (_m *Client) Wait() {
_m.Called()
}
type mockConstructorTestingTNewClient interface {
type NewClientT interface {
mock.TestingT
Cleanup(func())
}
// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewClient(t mockConstructorTestingTNewClient) *Client {
func NewClient(t NewClientT) *Client {
mock := &Client{}
mock.Mock.Test(t)

View File

@@ -34,6 +34,9 @@ func AddNodeFlags(cmd *cobra.Command) {
config.PrivValidator.ListenAddr,
"socket address to listen on for connections from external priv-validator process")
// node flags
cmd.Flags().Bool("blocksync.enable", config.BlockSync.Enable, "enable fast blockchain syncing")
// TODO (https://github.com/tendermint/tendermint/issues/6908): remove this check after the v0.35 release cycle
// This check was added to give users an upgrade prompt to use the new flag for syncing.
//

View File

@@ -778,7 +778,7 @@ func DefaultP2PConfig() *P2PConfig {
MaxNumInboundPeers: 40,
MaxNumOutboundPeers: 10,
MaxConnections: 64,
MaxOutgoingConnections: 12,
MaxOutgoingConnections: 32,
MaxIncomingConnectionAttempts: 100,
PersistentPeersMaxDialPeriod: 0 * time.Second,
FlushThrottleTimeout: 100 * time.Millisecond,

6
go.mod
View File

@@ -28,7 +28,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/lib/pq v1.10.6
github.com/libp2p/go-buffer-pool v0.1.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/minio/highwayhash v1.0.2
github.com/mroth/weightedrand v0.4.1
github.com/oasisprotocol/curve25519-voi v0.0.0-20210609091139-0a56a4bca00b
@@ -41,10 +41,10 @@ require (
github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.7.2
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca
github.com/tendermint/tm-db v0.6.6
github.com/vektra/mockery/v2 v2.14.0
github.com/vektra/mockery/v2 v2.13.1
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29

14
go.sum
View File

@@ -696,8 +696,8 @@ github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs=
github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/lufeee/execinquery v1.0.0 h1:1XUTuLIVPDlFvUU3LXmmZwHDsolsxXnY67lzhpeqe0I=
github.com/lufeee/execinquery v1.0.0/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
@@ -1042,9 +1042,8 @@ github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20200128134331-0f66f006fb2e/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v0.0.0-20170130113145-4d4bfba8f1d1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.1.4/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -1054,9 +1053,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs=
github.com/subosito/gotenv v1.4.0 h1:yAzM1+SmVcz5R4tXGsNMu1jUl2aOJXoiWUCEwwnGrvs=
@@ -1106,8 +1104,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
github.com/valyala/quicktemplate v1.7.0/go.mod h1:sqKJnoaOF88V07vkO+9FL8fb9uZg/VPSJnLYn+LmLk8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/vektra/mockery/v2 v2.14.0 h1:KZ1p5Hrn8tiY+LErRMr14HHle6khxo+JKOXLBW/yfqs=
github.com/vektra/mockery/v2 v2.14.0/go.mod h1:bnD1T8tExSgPD1ripLkDbr60JA9VtQeu12P3wgLZd7M=
github.com/vektra/mockery/v2 v2.13.1 h1:Lqs7aZiC7TwZO76fJ/4Zsb3NaO4F7cuuz0mZLYeNwtQ=
github.com/vektra/mockery/v2 v2.13.1/go.mod h1:bnD1T8tExSgPD1ripLkDbr60JA9VtQeu12P3wgLZd7M=
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=

View File

@@ -27,13 +27,13 @@ func (_m *ConsSyncReactor) SwitchToConsensus(_a0 state.State, _a1 bool) {
_m.Called(_a0, _a1)
}
type mockConstructorTestingTNewConsSyncReactor interface {
type NewConsSyncReactorT interface {
mock.TestingT
Cleanup(func())
}
// NewConsSyncReactor creates a new instance of ConsSyncReactor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConsSyncReactor(t mockConstructorTestingTNewConsSyncReactor) *ConsSyncReactor {
func NewConsSyncReactor(t NewConsSyncReactorT) *ConsSyncReactor {
mock := &ConsSyncReactor{}
mock.Mock.Test(t)

View File

@@ -58,13 +58,13 @@ func (_m *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
return r0
}
type mockConstructorTestingTNewBlockStore interface {
type NewBlockStoreT interface {
mock.TestingT
Cleanup(func())
}
// NewBlockStore creates a new instance of BlockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewBlockStore(t mockConstructorTestingTNewBlockStore) *BlockStore {
func NewBlockStore(t NewBlockStoreT) *BlockStore {
mock := &BlockStore{}
mock.Mock.Test(t)

File diff suppressed because it is too large Load Diff

View File

@@ -95,18 +95,6 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
}
// mustCheckTx invokes txmp.CheckTx for the given transaction and waits until
// its callback has finished executing. It fails t if CheckTx fails.
func mustCheckTx(t *testing.T, txmp *TxMempool, spec string) {
done := make(chan struct{})
if err := txmp.CheckTx(context.Background(), []byte(spec), func(*abci.Response) {
close(done)
}, mempool.TxInfo{}); err != nil {
t.Fatalf("CheckTx for %q failed: %v", spec, err)
}
<-done
}
func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
txs := make([]testTx, numTxs)
txInfo := mempool.TxInfo{SenderID: peerID}
@@ -208,76 +196,6 @@ func TestTxMempool_Size(t *testing.T) {
require.Equal(t, int64(2850), txmp.SizeBytes())
}
func TestTxMempool_Eviction(t *testing.T) {
txmp := setup(t, 0)
txmp.config.Size = 5
txmp.config.MaxTxsBytes = 60
txExists := func(spec string) bool {
txmp.Lock()
defer txmp.Unlock()
key := types.Tx(spec).Key()
_, ok := txmp.txByKey[key]
return ok
}
// A transaction bigger than the mempool should be rejected even when there
// are slots available.
mustCheckTx(t, txmp, "big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1")
require.Equal(t, 0, txmp.Size())
// Nearly-fill the mempool with a low-priority transaction, to show that it
// is evicted even when slots are available for a higher-priority tx.
const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2"
mustCheckTx(t, txmp, bigTx)
require.Equal(t, 1, txmp.Size()) // bigTx is the only element
require.True(t, txExists(bigTx))
require.Equal(t, int64(len(bigTx)), txmp.SizeBytes())
// The next transaction should evict bigTx, because it is higher priority
// but does not fit on size.
mustCheckTx(t, txmp, "key1=0000=25")
require.True(t, txExists("key1=0000=25"))
require.False(t, txExists(bigTx))
require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes())
// Now fill up the rest of the slots with other transactions.
mustCheckTx(t, txmp, "key2=0001=5")
mustCheckTx(t, txmp, "key3=0002=10")
mustCheckTx(t, txmp, "key4=0003=3")
mustCheckTx(t, txmp, "key5=0004=3")
// A new transaction with low priority should be discarded.
mustCheckTx(t, txmp, "key6=0005=1")
require.False(t, txExists("key6=0005=1"))
// A new transaction with higher priority should evict key5, which is the
// newest of the two transactions with lowest priority.
mustCheckTx(t, txmp, "key7=0006=7")
require.True(t, txExists("key7=0006=7")) // new transaction added
require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted
require.True(t, txExists("key4=0003=3")) // older low-priority tx retained
// Another new transaction evicts the other low-priority element.
mustCheckTx(t, txmp, "key8=0007=20")
require.True(t, txExists("key8=0007=20"))
require.False(t, txExists("key4=0003=3"))
// Now the lowest-priority tx is 5, so that should be the next to go.
mustCheckTx(t, txmp, "key9=0008=9")
require.True(t, txExists("key9=0008=9"))
require.False(t, txExists("k3y2=0001=5"))
// Add a transaction that requires eviction of multiple lower-priority
// entries, in order to fit the size of the element.
mustCheckTx(t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11
require.True(t, txExists("key1=0000=25"))
require.True(t, txExists("key8=0007=20"))
require.True(t, txExists("key10=0123456789abcdef=11"))
require.False(t, txExists("key3=0002=10"))
require.False(t, txExists("key9=0008=9"))
require.False(t, txExists("key7=0006=7"))
}
func TestTxMempool_Flush(t *testing.T) {
txmp := setup(t, 0)
txs := checkTxs(t, txmp, 100, 0)
@@ -520,51 +438,6 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) {
require.Zero(t, txmp.SizeBytes())
}
func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) {
txmp := setup(t, 50)
txmp.config.TTLDuration = 5 * time.Millisecond
added1 := checkTxs(t, txmp, 25, 0)
require.Equal(t, len(added1), txmp.Size())
// Wait a while, then add some more transactions that should not be expired
// when the first batch TTLs out.
//
// ms: 0 1 2 3 4 5 6
// ^ ^ ^ ^
// | | | +-- Update (triggers pruning)
// | | +------ first batch expires
// | +-------------- second batch added
// +-------------------------- first batch added
//
// The exact intervals are not important except that the delta should be
// large relative to the cost of CheckTx (ms vs. ns is fine here).
time.Sleep(3 * time.Millisecond)
added2 := checkTxs(t, txmp, 25, 1)
// Wait a while longer, so that the first batch will expire.
time.Sleep(3 * time.Millisecond)
// Trigger an update so that pruning will occur.
txmp.Lock()
defer txmp.Unlock()
require.NoError(t, txmp.Update(txmp.height+1, nil, nil, nil, nil))
// All the transactions in the original set should have been purged.
for _, tx := range added1 {
if _, ok := txmp.txByKey[tx.tx.Key()]; ok {
t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key())
}
}
// All the transactions added later should still be around.
for _, tx := range added2 {
if _, ok := txmp.txByKey[tx.tx.Key()]; !ok {
t.Errorf("Transaction %X should still be in the mempool, but is not", tx.tx.Key())
}
}
}
func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
txmp := setup(t, 500)
txmp.height = 100
@@ -572,6 +445,7 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
tTxs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, 100, txmp.heightIndex.Size())
// reap 5 txs at the next height -- no txs should expire
reapedTxs := txmp.ReapMaxTxs(5)
@@ -585,10 +459,12 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
txmp.Unlock()
require.Equal(t, 95, txmp.Size())
require.Equal(t, 95, txmp.heightIndex.Size())
// check more txs at height 101
_ = checkTxs(t, txmp, 50, 1)
require.Equal(t, 145, txmp.Size())
require.Equal(t, 145, txmp.heightIndex.Size())
// Reap 5 txs at a height that would expire all the transactions from before
// the previous Update (height 100).
@@ -609,6 +485,7 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
txmp.Unlock()
require.GreaterOrEqual(t, txmp.Size(), 45)
require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45)
}
func TestTxMempool_CheckTxPostCheckError(t *testing.T) {

View File

@@ -0,0 +1,159 @@
package v1
import (
"container/heap"
"sort"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
)
var _ heap.Interface = (*TxPriorityQueue)(nil)
// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
type TxPriorityQueue struct {
mtx tmsync.RWMutex
txs []*WrappedTx
}
func NewTxPriorityQueue() *TxPriorityQueue {
pq := &TxPriorityQueue{
txs: make([]*WrappedTx, 0),
}
heap.Init(pq)
return pq
}
// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be
// evicted to make room for another *WrappedTx with higher priority. If no such
// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx
// indicate that these transactions can be removed due to them being of lower
// priority and that their total sum in size allows room for the incoming
// transaction according to the mempool's configured limits.
func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx {
pq.mtx.RLock()
defer pq.mtx.RUnlock()
txs := make([]*WrappedTx, len(pq.txs))
copy(txs, pq.txs)
sort.Slice(txs, func(i, j int) bool {
return txs[i].priority < txs[j].priority
})
var (
toEvict []*WrappedTx
i int
)
currSize := totalSize
// Loop over all transactions in ascending priority order evaluating those
// that are only of less priority than the provided argument. We continue
// evaluating transactions until there is sufficient capacity for the new
// transaction (size) as defined by txSize.
for i < len(txs) && txs[i].priority < priority {
toEvict = append(toEvict, txs[i])
currSize -= int64(txs[i].Size())
if currSize+txSize <= cap {
return toEvict
}
i++
}
return nil
}
// NumTxs returns the number of transactions in the priority queue. It is
// thread safe.
func (pq *TxPriorityQueue) NumTxs() int {
pq.mtx.RLock()
defer pq.mtx.RUnlock()
return len(pq.txs)
}
// RemoveTx removes a specific transaction from the priority queue.
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()
if tx.heapIndex < len(pq.txs) {
heap.Remove(pq, tx.heapIndex)
}
}
// PushTx adds a valid transaction to the priority queue. It is thread safe.
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()
heap.Push(pq, tx)
}
// PopTx removes the top priority transaction from the queue. It is thread safe.
func (pq *TxPriorityQueue) PopTx() *WrappedTx {
pq.mtx.Lock()
defer pq.mtx.Unlock()
x := heap.Pop(pq)
if x != nil {
return x.(*WrappedTx)
}
return nil
}
// Push implements the Heap interface.
//
// NOTE: A caller should never call Push. Use PushTx instead.
func (pq *TxPriorityQueue) Push(x interface{}) {
n := len(pq.txs)
item := x.(*WrappedTx)
item.heapIndex = n
pq.txs = append(pq.txs, item)
}
// Pop implements the Heap interface.
//
// NOTE: A caller should never call Pop. Use PopTx instead.
func (pq *TxPriorityQueue) Pop() interface{} {
old := pq.txs
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.heapIndex = -1 // for safety
pq.txs = old[0 : n-1]
return item
}
// Len implements the Heap interface.
//
// NOTE: A caller should never call Len. Use NumTxs instead.
func (pq *TxPriorityQueue) Len() int {
return len(pq.txs)
}
// Less implements the Heap interface. It returns true if the transaction at
// position i in the queue is of less priority than the transaction at position j.
func (pq *TxPriorityQueue) Less(i, j int) bool {
// If there exists two transactions with the same priority, consider the one
// that we saw the earliest as the higher priority transaction.
if pq.txs[i].priority == pq.txs[j].priority {
return pq.txs[i].timestamp.Before(pq.txs[j].timestamp)
}
// We want Pop to give us the highest, not lowest, priority so we use greater
// than here.
return pq.txs[i].priority > pq.txs[j].priority
}
// Swap implements the Heap interface. It swaps two transactions in the queue.
func (pq *TxPriorityQueue) Swap(i, j int) {
pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i]
pq.txs[i].heapIndex = i
pq.txs[j].heapIndex = j
}

View File

@@ -0,0 +1,176 @@
package v1
import (
"math/rand"
"sort"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestTxPriorityQueue(t *testing.T) {
pq := NewTxPriorityQueue()
numTxs := 1000
priorities := make([]int, numTxs)
var wg sync.WaitGroup
for i := 1; i <= numTxs; i++ {
priorities[i-1] = i
wg.Add(1)
go func(i int) {
pq.PushTx(&WrappedTx{
priority: int64(i),
timestamp: time.Now(),
})
wg.Done()
}(i)
}
sort.Sort(sort.Reverse(sort.IntSlice(priorities)))
wg.Wait()
require.Equal(t, numTxs, pq.NumTxs())
// Wait a second and push a tx with a duplicate priority
time.Sleep(time.Second)
now := time.Now()
pq.PushTx(&WrappedTx{
priority: 1000,
timestamp: now,
})
require.Equal(t, 1001, pq.NumTxs())
tx := pq.PopTx()
require.Equal(t, 1000, pq.NumTxs())
require.Equal(t, int64(1000), tx.priority)
require.NotEqual(t, now, tx.timestamp)
gotPriorities := make([]int, 0)
for pq.NumTxs() > 0 {
gotPriorities = append(gotPriorities, int(pq.PopTx().priority))
}
require.Equal(t, priorities, gotPriorities)
}
func TestTxPriorityQueue_GetEvictableTxs(t *testing.T) {
pq := NewTxPriorityQueue()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
values := make([]int, 1000)
for i := 0; i < 1000; i++ {
tx := make([]byte, 5) // each tx is 5 bytes
_, err := rng.Read(tx)
require.NoError(t, err)
x := rng.Intn(100000)
pq.PushTx(&WrappedTx{
tx: tx,
priority: int64(x),
})
values[i] = x
}
sort.Ints(values)
max := values[len(values)-1]
min := values[0]
totalSize := int64(len(values) * 5)
testCases := []struct {
name string
priority, txSize, totalSize, cap int64
expectedLen int
}{
{
name: "larest priority; single tx",
priority: int64(max + 1),
txSize: 5,
totalSize: totalSize,
cap: totalSize,
expectedLen: 1,
},
{
name: "larest priority; multi tx",
priority: int64(max + 1),
txSize: 17,
totalSize: totalSize,
cap: totalSize,
expectedLen: 4,
},
{
name: "larest priority; out of capacity",
priority: int64(max + 1),
txSize: totalSize + 1,
totalSize: totalSize,
cap: totalSize,
expectedLen: 0,
},
{
name: "smallest priority; no tx",
priority: int64(min - 1),
txSize: 5,
totalSize: totalSize,
cap: totalSize,
expectedLen: 0,
},
{
name: "small priority; no tx",
priority: int64(min),
txSize: 5,
totalSize: totalSize,
cap: totalSize,
expectedLen: 0,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
evictTxs := pq.GetEvictableTxs(tc.priority, tc.txSize, tc.totalSize, tc.cap)
require.Len(t, evictTxs, tc.expectedLen)
})
}
}
func TestTxPriorityQueue_RemoveTx(t *testing.T) {
pq := NewTxPriorityQueue()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
numTxs := 1000
values := make([]int, numTxs)
for i := 0; i < numTxs; i++ {
x := rng.Intn(100000)
pq.PushTx(&WrappedTx{
priority: int64(x),
})
values[i] = x
}
require.Equal(t, numTxs, pq.NumTxs())
sort.Ints(values)
max := values[len(values)-1]
wtx := pq.txs[pq.NumTxs()/2]
pq.RemoveTx(wtx)
require.Equal(t, numTxs-1, pq.NumTxs())
require.Equal(t, int64(max), pq.PopTx().priority)
require.Equal(t, numTxs-2, pq.NumTxs())
require.NotPanics(t, func() {
pq.RemoveTx(&WrappedTx{heapIndex: numTxs})
pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1})
})
require.Equal(t, numTxs-2, pq.NumTxs())
}

View File

@@ -308,6 +308,9 @@ func (r *Reactor) processPeerUpdates() {
}
func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) {
peerMempoolID := r.ids.GetForPeer(peerID)
var nextGossipTx *clist.CElement
// remove the peer ID from the map of routines and mark the waitgroup as done
defer func() {
r.mtx.Lock()
@@ -326,8 +329,6 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
}
}()
peerMempoolID := r.ids.GetForPeer(peerID)
var nextGossipTx *clist.CElement
for {
if !r.IsRunning() {
return
@@ -338,8 +339,8 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
// start from the beginning.
if nextGossipTx == nil {
select {
case <-r.mempool.TxsWaitChan(): // wait until a tx is available
if nextGossipTx = r.mempool.TxsFront(); nextGossipTx == nil {
case <-r.mempool.WaitForNextTx(): // wait until a tx is available
if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil {
continue
}
@@ -357,11 +358,9 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
memTx := nextGossipTx.Value.(*WrappedTx)
// Send the transaction to a peer if we didn't receive it from that peer.
//
// NOTE: Transaction batching was disabled due to:
// https://github.com/tendermint/tendermint/issues/5796
if !memTx.HasPeer(peerMempoolID) {
if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok {
// Send the mempool tx to the corresponding peer. Note, the peer may be
// behind and thus would not be able to process the mempool tx correctly.
r.mempoolCh.Out <- p2p.Envelope{

View File

@@ -134,9 +134,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
primaryMempool.Lock()
primaryMempool.insertTx(next)
primaryMempool.Unlock()
}()
}

View File

@@ -1,87 +1,281 @@
package v1
import (
"sync"
"sort"
"time"
"github.com/tendermint/tendermint/internal/libs/clist"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/types"
)
// WrappedTx defines a wrapper around a raw transaction with additional metadata
// that is used for indexing.
type WrappedTx struct {
tx types.Tx // the original transaction data
hash types.TxKey // the transaction hash
height int64 // height when this transaction was initially checked (for expiry)
timestamp time.Time // time when transaction was entered (for TTL)
// tx represents the raw binary transaction data
tx types.Tx
mtx sync.Mutex
gasWanted int64 // app: gas required to execute this transaction
priority int64 // app: priority value for this transaction
sender string // app: assigned sender label
peers map[uint16]bool // peer IDs who have sent us this transaction
// hash defines the transaction hash and the primary key used in the mempool
hash types.TxKey
// height defines the height at which the transaction was validated at
height int64
// gasWanted defines the amount of gas the transaction sender requires
gasWanted int64
// priority defines the transaction's priority as specified by the application
// in the ResponseCheckTx response.
priority int64
// sender defines the transaction's sender as specified by the application in
// the ResponseCheckTx response.
sender string
// timestamp is the time at which the node first received the transaction from
// a peer. It is used as a second dimension is prioritizing transactions when
// two transactions have the same priority.
timestamp time.Time
// peers records a mapping of all peers that sent a given transaction
peers map[uint16]struct{}
// heapIndex defines the index of the item in the heap
heapIndex int
// gossipEl references the linked-list element in the gossip index
gossipEl *clist.CElement
// removed marks the transaction as removed from the mempool. This is set
// during RemoveTx and is needed due to the fact that a given existing
// transaction in the mempool can be evicted when it is simultaneously having
// a reCheckTx callback executed.
removed bool
}
// Size reports the size of the raw transaction in bytes.
func (w *WrappedTx) Size() int64 { return int64(len(w.tx)) }
func (wtx *WrappedTx) Size() int {
return len(wtx.tx)
}
// SetPeer adds the specified peer ID as a sender of w.
func (w *WrappedTx) SetPeer(id uint16) {
w.mtx.Lock()
defer w.mtx.Unlock()
if w.peers == nil {
w.peers = map[uint16]bool{id: true}
} else {
w.peers[id] = true
// TxStore implements a thread-safe mapping of valid transaction(s).
//
// NOTE:
// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative
// access is not allowed. Regardless, it is not expected for the mempool to
// need mutative access.
type TxStore struct {
mtx tmsync.RWMutex
hashTxs map[types.TxKey]*WrappedTx // primary index
senderTxs map[string]*WrappedTx // sender is defined by the ABCI application
}
func NewTxStore() *TxStore {
return &TxStore{
senderTxs: make(map[string]*WrappedTx),
hashTxs: make(map[types.TxKey]*WrappedTx),
}
}
// HasPeer reports whether the specified peer ID is a sender of w.
func (w *WrappedTx) HasPeer(id uint16) bool {
w.mtx.Lock()
defer w.mtx.Unlock()
_, ok := w.peers[id]
// Size returns the total number of transactions in the store.
func (txs *TxStore) Size() int {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
return len(txs.hashTxs)
}
// GetAllTxs returns all the transactions currently in the store.
func (txs *TxStore) GetAllTxs() []*WrappedTx {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
wTxs := make([]*WrappedTx, len(txs.hashTxs))
i := 0
for _, wtx := range txs.hashTxs {
wTxs[i] = wtx
i++
}
return wTxs
}
// GetTxBySender returns a *WrappedTx by the transaction's sender property
// defined by the ABCI application.
func (txs *TxStore) GetTxBySender(sender string) *WrappedTx {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
return txs.senderTxs[sender]
}
// GetTxByHash returns a *WrappedTx by the transaction's hash.
func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
return txs.hashTxs[hash]
}
// IsTxRemoved returns true if a transaction by hash is marked as removed and
// false otherwise.
func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
wtx, ok := txs.hashTxs[hash]
if ok {
return wtx.removed
}
return false
}
// SetTx stores a *WrappedTx by it's hash. If the transaction also contains a
// non-empty sender, we additionally store the transaction by the sender as
// defined by the ABCI application.
func (txs *TxStore) SetTx(wtx *WrappedTx) {
txs.mtx.Lock()
defer txs.mtx.Unlock()
if len(wtx.sender) > 0 {
txs.senderTxs[wtx.sender] = wtx
}
txs.hashTxs[wtx.tx.Key()] = wtx
}
// RemoveTx removes a *WrappedTx from the transaction store. It deletes all
// indexes of the transaction.
func (txs *TxStore) RemoveTx(wtx *WrappedTx) {
txs.mtx.Lock()
defer txs.mtx.Unlock()
if len(wtx.sender) > 0 {
delete(txs.senderTxs, wtx.sender)
}
delete(txs.hashTxs, wtx.tx.Key())
wtx.removed = true
}
// TxHasPeer returns true if a transaction by hash has a given peer ID and false
// otherwise. If the transaction does not exist, false is returned.
func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
wtx := txs.hashTxs[hash]
if wtx == nil {
return false
}
_, ok := wtx.peers[peerID]
return ok
}
// SetGasWanted sets the application-assigned gas requirement of w.
func (w *WrappedTx) SetGasWanted(gas int64) {
w.mtx.Lock()
defer w.mtx.Unlock()
w.gasWanted = gas
// GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the
// given peerID to the WrappedTx's set of peers that sent us this transaction.
// We return true if we've already recorded the given peer for this transaction
// and false otherwise. If the transaction does not exist by hash, we return
// (nil, false).
func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) {
txs.mtx.Lock()
defer txs.mtx.Unlock()
wtx := txs.hashTxs[hash]
if wtx == nil {
return nil, false
}
if wtx.peers == nil {
wtx.peers = make(map[uint16]struct{})
}
if _, ok := wtx.peers[peerID]; ok {
return wtx, true
}
wtx.peers[peerID] = struct{}{}
return wtx, false
}
// GasWanted reports the application-assigned gas requirement of w.
func (w *WrappedTx) GasWanted() int64 {
w.mtx.Lock()
defer w.mtx.Unlock()
return w.gasWanted
// WrappedTxList implements a thread-safe list of *WrappedTx objects that can be
// used to build generic transaction indexes in the mempool. It accepts a
// comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx
// references which is used during Insert in order to determine sorted order. If
// less returns true, a <= b.
type WrappedTxList struct {
mtx tmsync.RWMutex
txs []*WrappedTx
less func(*WrappedTx, *WrappedTx) bool
}
// SetSender sets the application-assigned sender of w.
func (w *WrappedTx) SetSender(sender string) {
w.mtx.Lock()
defer w.mtx.Unlock()
w.sender = sender
func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList {
return &WrappedTxList{
txs: make([]*WrappedTx, 0),
less: less,
}
}
// Sender reports the application-assigned sender of w.
func (w *WrappedTx) Sender() string {
w.mtx.Lock()
defer w.mtx.Unlock()
return w.sender
// Size returns the number of WrappedTx objects in the list.
func (wtl *WrappedTxList) Size() int {
wtl.mtx.RLock()
defer wtl.mtx.RUnlock()
return len(wtl.txs)
}
// SetPriority sets the application-assigned priority of w.
func (w *WrappedTx) SetPriority(p int64) {
w.mtx.Lock()
defer w.mtx.Unlock()
w.priority = p
// Reset resets the list of transactions to an empty list.
func (wtl *WrappedTxList) Reset() {
wtl.mtx.Lock()
defer wtl.mtx.Unlock()
wtl.txs = make([]*WrappedTx, 0)
}
// Priority reports the application-assigned priority of w.
func (w *WrappedTx) Priority() int64 {
w.mtx.Lock()
defer w.mtx.Unlock()
return w.priority
// Insert inserts a WrappedTx reference into the sorted list based on the list's
// comparator function.
func (wtl *WrappedTxList) Insert(wtx *WrappedTx) {
wtl.mtx.Lock()
defer wtl.mtx.Unlock()
i := sort.Search(len(wtl.txs), func(i int) bool {
return wtl.less(wtl.txs[i], wtx)
})
if i == len(wtl.txs) {
// insert at the end
wtl.txs = append(wtl.txs, wtx)
return
}
// Make space for the inserted element by shifting values at the insertion
// index up one index.
//
// NOTE: The call to append does not allocate memory when cap(wtl.txs) > len(wtl.txs).
wtl.txs = append(wtl.txs[:i+1], wtl.txs[i:]...)
wtl.txs[i] = wtx
}
// Remove attempts to remove a WrappedTx from the sorted list.
func (wtl *WrappedTxList) Remove(wtx *WrappedTx) {
wtl.mtx.Lock()
defer wtl.mtx.Unlock()
i := sort.Search(len(wtl.txs), func(i int) bool {
return wtl.less(wtl.txs[i], wtx)
})
// Since the list is sorted, we evaluate all elements starting at i. Note, if
// the element does not exist, we may potentially evaluate the entire remainder
// of the list. However, a caller should not be expected to call Remove with a
// non-existing element.
for i < len(wtl.txs) {
if wtl.txs[i] == wtx {
wtl.txs = append(wtl.txs[:i], wtl.txs[i+1:]...)
return
}
i++
}
}

View File

@@ -0,0 +1,230 @@
package v1
import (
"fmt"
"math/rand"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/types"
)
func TestTxStore_GetTxBySender(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
sender: "foo",
priority: 1,
timestamp: time.Now(),
}
res := txs.GetTxBySender(wtx.sender)
require.Nil(t, res)
txs.SetTx(wtx)
res = txs.GetTxBySender(wtx.sender)
require.NotNil(t, res)
require.Equal(t, wtx, res)
}
func TestTxStore_GetTxByHash(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
sender: "foo",
priority: 1,
timestamp: time.Now(),
}
key := wtx.tx.Key()
res := txs.GetTxByHash(key)
require.Nil(t, res)
txs.SetTx(wtx)
res = txs.GetTxByHash(key)
require.NotNil(t, res)
require.Equal(t, wtx, res)
}
func TestTxStore_SetTx(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
priority: 1,
timestamp: time.Now(),
}
key := wtx.tx.Key()
txs.SetTx(wtx)
res := txs.GetTxByHash(key)
require.NotNil(t, res)
require.Equal(t, wtx, res)
wtx.sender = "foo"
txs.SetTx(wtx)
res = txs.GetTxByHash(key)
require.NotNil(t, res)
require.Equal(t, wtx, res)
}
func TestTxStore_GetOrSetPeerByTxHash(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
priority: 1,
timestamp: time.Now(),
}
key := wtx.tx.Key()
txs.SetTx(wtx)
res, ok := txs.GetOrSetPeerByTxHash(types.Tx([]byte("test_tx_2")).Key(), 15)
require.Nil(t, res)
require.False(t, ok)
res, ok = txs.GetOrSetPeerByTxHash(key, 15)
require.NotNil(t, res)
require.False(t, ok)
res, ok = txs.GetOrSetPeerByTxHash(key, 15)
require.NotNil(t, res)
require.True(t, ok)
require.True(t, txs.TxHasPeer(key, 15))
require.False(t, txs.TxHasPeer(key, 16))
}
func TestTxStore_RemoveTx(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
priority: 1,
timestamp: time.Now(),
}
txs.SetTx(wtx)
key := wtx.tx.Key()
res := txs.GetTxByHash(key)
require.NotNil(t, res)
txs.RemoveTx(res)
res = txs.GetTxByHash(key)
require.Nil(t, res)
}
func TestTxStore_Size(t *testing.T) {
txStore := NewTxStore()
numTxs := 1000
for i := 0; i < numTxs; i++ {
txStore.SetTx(&WrappedTx{
tx: []byte(fmt.Sprintf("test_tx_%d", i)),
priority: int64(i),
timestamp: time.Now(),
})
}
require.Equal(t, numTxs, txStore.Size())
}
func TestWrappedTxList_Reset(t *testing.T) {
list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.height >= wtx2.height
})
require.Zero(t, list.Size())
for i := 0; i < 100; i++ {
list.Insert(&WrappedTx{height: int64(i)})
}
require.Equal(t, 100, list.Size())
list.Reset()
require.Zero(t, list.Size())
}
func TestWrappedTxList_Insert(t *testing.T) {
list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.height >= wtx2.height
})
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
var expected []int
for i := 0; i < 100; i++ {
height := rng.Int63n(10000)
expected = append(expected, int(height))
list.Insert(&WrappedTx{height: height})
if i%10 == 0 {
list.Insert(&WrappedTx{height: height})
expected = append(expected, int(height))
}
}
got := make([]int, list.Size())
for i, wtx := range list.txs {
got[i] = int(wtx.height)
}
sort.Ints(expected)
require.Equal(t, expected, got)
}
func TestWrappedTxList_Remove(t *testing.T) {
list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.height >= wtx2.height
})
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
var txs []*WrappedTx
for i := 0; i < 100; i++ {
height := rng.Int63n(10000)
tx := &WrappedTx{height: height}
txs = append(txs, tx)
list.Insert(tx)
if i%10 == 0 {
tx = &WrappedTx{height: height}
list.Insert(tx)
txs = append(txs, tx)
}
}
// remove a tx that does not exist
list.Remove(&WrappedTx{height: 20000})
// remove a tx that exists (by height) but not referenced
list.Remove(&WrappedTx{height: txs[0].height})
// remove a few existing txs
for i := 0; i < 25; i++ {
j := rng.Intn(len(txs))
list.Remove(txs[j])
txs = append(txs[:j], txs[j+1:]...)
}
expected := make([]int, len(txs))
for i, tx := range txs {
expected[i] = int(tx.height)
}
got := make([]int, list.Size())
for i, wtx := range list.txs {
got[i] = int(wtx.height)
}
sort.Ints(expected)
require.Equal(t, expected, got)
}

View File

@@ -807,8 +807,6 @@ func (ch *Channel) sendBytes(bytes []byte) bool {
return true
case <-time.After(defaultSendTimeout):
return false
case <-ch.conn.Quit():
return false
}
}

View File

@@ -209,13 +209,13 @@ func (_m *Connection) TrySendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error
return r0, r1
}
type mockConstructorTestingTNewConnection interface {
type NewConnectionT interface {
mock.TestingT
Cleanup(func())
}
// NewConnection creates a new instance of Connection. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConnection(t mockConstructorTestingTNewConnection) *Connection {
func NewConnection(t NewConnectionT) *Connection {
mock := &Connection{}
mock.Mock.Test(t)

View File

@@ -333,13 +333,13 @@ func (_m *Peer) Wait() {
_m.Called()
}
type mockConstructorTestingTNewPeer interface {
type NewPeerT interface {
mock.TestingT
Cleanup(func())
}
// NewPeer creates a new instance of Peer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewPeer(t mockConstructorTestingTNewPeer) *Peer {
func NewPeer(t NewPeerT) *Peer {
mock := &Peer{}
mock.Mock.Test(t)

View File

@@ -120,13 +120,13 @@ func (_m *Transport) String() string {
return r0
}
type mockConstructorTestingTNewTransport interface {
type NewTransportT interface {
mock.TestingT
Cleanup(func())
}
// NewTransport creates a new instance of Transport. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewTransport(t mockConstructorTestingTNewTransport) *Transport {
func NewTransport(t NewTransportT) *Transport {
mock := &Transport{}
mock.Mock.Test(t)

View File

@@ -237,13 +237,12 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
require.Len(t, transport.Endpoints(), 1, "transport not listening on 1 endpoint")
peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{
MinRetryTime: 10 * time.Millisecond,
DisconnectCooldownPeriod: 10 * time.Millisecond,
MaxRetryTime: 100 * time.Millisecond,
RetryTimeJitter: time.Millisecond,
MaxPeers: opts.MaxPeers,
MaxConnected: opts.MaxConnected,
Metrics: p2p.NopMetrics(),
MinRetryTime: 10 * time.Millisecond,
MaxRetryTime: 100 * time.Millisecond,
RetryTimeJitter: time.Millisecond,
MaxPeers: opts.MaxPeers,
MaxConnected: opts.MaxConnected,
Metrics: p2p.NopMetrics(),
})
require.NoError(t, err)

View File

@@ -975,7 +975,7 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
}
var numAddresses int
var totalAbsScore int
var totalScore int
ranked := m.store.Ranked()
seenAddresses := map[NodeAddress]struct{}{}
scores := map[types.NodeID]int{}
@@ -986,12 +986,8 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
continue
}
score := int(peer.Score())
if score < 0 {
totalAbsScore += -score
} else {
totalAbsScore += score
}
totalScore += score
scores[peer.ID] = score
for addr := range peer.AddressInfo {
if _, ok := m.options.PrivatePeers[addr.NodeID]; !ok {
@@ -1000,8 +996,6 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
}
}
meanAbsScore := (totalAbsScore + 1) / (len(scores) + 1)
var attempts uint16
var addedLastIteration bool
@@ -1050,7 +1044,7 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
// peer.
// nolint:gosec // G404: Use of weak random number generator
if numAddresses <= int(limit) || rand.Intn((meanAbsScore*2)+1) <= scores[peer.ID]+1 || rand.Intn((idx+1)*10) <= idx+1 {
if numAddresses <= int(limit) || rand.Intn(totalScore+1) <= scores[peer.ID]+1 || rand.Intn((idx+1)*10) <= idx+1 {
addresses = append(addresses, addressInfo.Address)
addedLastIteration = true
seenAddresses[addressInfo.Address] = struct{}{}
@@ -1452,6 +1446,47 @@ func (s *peerStore) Ranked() []*peerInfo {
}
sort.Slice(s.ranked, func(i, j int) bool {
return s.ranked[i].Score() > s.ranked[j].Score()
// TODO: reevaluate more wholistic sorting, perhaps as follows:
// // sort inactive peers after active peers
// if s.ranked[i].Inactive && !s.ranked[j].Inactive {
// return false
// } else if !s.ranked[i].Inactive && s.ranked[j].Inactive {
// return true
// }
// iLastDialed, iLastDialSuccess := s.ranked[i].LastDialed()
// jLastDialed, jLastDialSuccess := s.ranked[j].LastDialed()
// // sort peers who our most recent dialing attempt was
// // successful ahead of peers with recent dialing
// // failures
// switch {
// case iLastDialSuccess && jLastDialSuccess:
// // if both peers were (are?) successfully
// // connected, convey their score, but give the
// // one we dialed successfully most recently a bonus
// iScore := s.ranked[i].Score()
// jScore := s.ranked[j].Score()
// if jLastDialed.Before(iLastDialed) {
// jScore++
// } else {
// iScore++
// }
// return iScore > jScore
// case iLastDialSuccess:
// return true
// case jLastDialSuccess:
// return false
// default:
// // if both peers were not successful in their
// // most recent dialing attempt, fall back to
// // peer score.
// return s.ranked[i].Score() > s.ranked[j].Score()
// }
})
return s.ranked
}

View File

@@ -29,16 +29,8 @@ func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] }
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
// if both elements have the same priority, prioritize based
// on most recent and largest
// if both elements have the same priority, prioritize based on most recent
if pq[i].priority == pq[j].priority {
diff := pq[i].timestamp.Sub(pq[j].timestamp)
if diff < 0 {
diff *= -1
}
if diff < 10*time.Millisecond {
return pq[i].size > pq[j].size
}
return pq[i].timestamp.After(pq[j].timestamp)
}
@@ -164,6 +156,7 @@ func (s *pqScheduler) start() {
func (s *pqScheduler) process() {
defer s.done.Close()
LOOP:
for {
select {
case e := <-s.enqueueCh:
@@ -255,21 +248,24 @@ func (s *pqScheduler) process() {
for s.pq.Len() > 0 {
pqEnv = heap.Pop(s.pq).(*pqEnvelope)
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To),
"message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
select {
case s.dequeueCh <- pqEnv.envelope:
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To),
"message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
case <-s.closer.Done():
return
default:
heap.Push(s.pq, pqEnv)
continue LOOP
}
}
@@ -280,10 +276,12 @@ func (s *pqScheduler) process() {
}
func (s *pqScheduler) push(pqEnv *pqEnvelope) {
chIDStr := strconv.Itoa(int(pqEnv.envelope.channelID))
// enqueue the incoming Envelope
heap.Push(s.pq, pqEnv)
s.size += pqEnv.size
s.metrics.PeerQueueMsgSize.With("ch_id", strconv.Itoa(int(pqEnv.envelope.channelID))).Add(float64(pqEnv.size))
s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Add(float64(pqEnv.size))
// Update the cumulative sizes by adding the Envelope's size to every
// priority less than or equal to it.

View File

@@ -40,10 +40,6 @@ type Envelope struct {
channelID ChannelID
}
func (e Envelope) IsZero() bool {
return e.From == "" && e.To == "" && e.Message == nil
}
// PeerError is a peer error reported via Channel.Error.
//
// FIXME: This currently just disconnects the peer, which is too simplistic.
@@ -170,10 +166,9 @@ type RouterOptions struct {
}
const (
queueTypeFifo = "fifo"
queueTypePriority = "priority"
queueTypeWDRR = "wdrr"
queueTypeSimplePriority = "simple-priority"
queueTypeFifo = "fifo"
queueTypePriority = "priority"
queueTypeWDRR = "wdrr"
)
// Validate validates router options.
@@ -181,8 +176,8 @@ func (o *RouterOptions) Validate() error {
switch o.QueueType {
case "":
o.QueueType = queueTypeFifo
case queueTypeFifo, queueTypeWDRR, queueTypePriority, queueTypeSimplePriority:
// pass
case queueTypeFifo, queueTypeWDRR, queueTypePriority:
// passI me
default:
return fmt.Errorf("queue type %q is not supported", o.QueueType)
}
@@ -359,9 +354,6 @@ func (r *Router) createQueueFactory() (func(int) queue, error) {
return q
}, nil
case queueTypeSimplePriority:
return func(size int) queue { return newSimplePriorityQueue(r.stopCtx(), size, r.chDescs) }, nil
default:
return nil, fmt.Errorf("cannot construct queue of type %q", r.options.QueueType)
}
@@ -427,12 +419,7 @@ func (r *Router) routeChannel(
) {
for {
select {
case envelope, ok := <-outCh:
if !ok {
return
} else if envelope.IsZero() {
continue
}
case envelope := <-outCh:
// Mark the envelope with the channel ID to allow sendPeer() to pass
// it on to Transport.SendMessage().
envelope.channelID = chID
@@ -509,10 +496,7 @@ func (r *Router) routeChannel(
}
}
case peerError, ok := <-errCh:
if !ok {
return
}
case peerError := <-errCh:
maxPeerCapacity := r.peerManager.HasMaxPeerCapacity()
r.logger.Error("peer error",
"peer", peerError.NodeID,

View File

@@ -1,112 +0,0 @@
package p2p
import (
"container/heap"
"context"
"sort"
"time"
"github.com/gogo/protobuf/proto"
)
type simpleQueue struct {
input chan Envelope
output chan Envelope
closeFn func()
closeCh <-chan struct{}
maxSize int
chDescs []ChannelDescriptor
}
func newSimplePriorityQueue(ctx context.Context, size int, chDescs []ChannelDescriptor) *simpleQueue {
if size%2 != 0 {
size++
}
ctx, cancel := context.WithCancel(ctx)
q := &simpleQueue{
input: make(chan Envelope, size*2),
output: make(chan Envelope, size/2),
maxSize: size * size,
closeCh: ctx.Done(),
closeFn: cancel,
}
go q.run(ctx)
return q
}
func (q *simpleQueue) enqueue() chan<- Envelope { return q.input }
func (q *simpleQueue) dequeue() <-chan Envelope { return q.output }
func (q *simpleQueue) close() { q.closeFn() }
func (q *simpleQueue) closed() <-chan struct{} { return q.closeCh }
func (q *simpleQueue) run(ctx context.Context) {
defer q.closeFn()
var chPriorities = make(map[ChannelID]uint, len(q.chDescs))
for _, chDesc := range q.chDescs {
chID := ChannelID(chDesc.ID)
chPriorities[chID] = uint(chDesc.Priority)
}
pq := make(priorityQueue, 0, q.maxSize)
heap.Init(&pq)
ticker := time.NewTicker(10 * time.Millisecond)
// must have a buffer of exactly one because both sides of
// this channel are used in this loop, and simply signals adds
// to the heap
signal := make(chan struct{}, 1)
for {
select {
case <-ctx.Done():
return
case <-q.closeCh:
return
case e := <-q.input:
// enqueue the incoming Envelope
heap.Push(&pq, &pqEnvelope{
envelope: e,
size: uint(proto.Size(e.Message)),
priority: chPriorities[e.channelID],
timestamp: time.Now().UTC(),
})
select {
case signal <- struct{}{}:
default:
if len(pq) > q.maxSize {
sort.Sort(pq)
pq = pq[:q.maxSize]
}
}
case <-ticker.C:
if len(pq) > q.maxSize {
sort.Sort(pq)
pq = pq[:q.maxSize]
}
if len(pq) > 0 {
select {
case signal <- struct{}{}:
default:
}
}
case <-signal:
SEND:
for len(pq) > 0 {
select {
case <-ctx.Done():
return
case <-q.closeCh:
return
case q.output <- heap.Pop(&pq).(*pqEnvelope).envelope:
continue SEND
default:
break SEND
}
}
}
}
}

View File

@@ -1,47 +0,0 @@
package p2p
import (
"context"
"testing"
"time"
)
func TestSimpleQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// set up a small queue with very small buffers so we can
// watch it shed load, then send a bunch of messages to the
// queue, most of which we'll watch it drop.
sq := newSimplePriorityQueue(ctx, 1, nil)
for i := 0; i < 100; i++ {
sq.enqueue() <- Envelope{From: "merlin"}
}
seen := 0
RETRY:
for seen <= 2 {
select {
case e := <-sq.dequeue():
if e.From != "merlin" {
continue
}
seen++
case <-time.After(10 * time.Millisecond):
break RETRY
}
}
// if we don't see any messages, then it's just broken.
if seen == 0 {
t.Errorf("seen %d messages, should have seen more than one", seen)
}
// ensure that load shedding happens: there can be at most 3
// messages that we get out of this, one that was buffered
// plus 2 that were under the cap, everything else gets
// dropped.
if seen > 3 {
t.Errorf("saw %d messages, should have seen 5 or fewer", seen)
}
}

View File

@@ -296,7 +296,7 @@ func (c *mConnConnection) Handshake(
select {
case <-handshakeCtx.Done():
_ = c.Close()
return types.NodeInfo{}, nil, handshakeCtx.Err()
return types.NodeInfo{}, nil, ctx.Err()
case err := <-errCh:
if err != nil {

View File

@@ -151,13 +151,13 @@ func (_m *AppConnConsensus) SetResponseCallback(_a0 abciclient.Callback) {
_m.Called(_a0)
}
type mockConstructorTestingTNewAppConnConsensus interface {
type NewAppConnConsensusT interface {
mock.TestingT
Cleanup(func())
}
// NewAppConnConsensus creates a new instance of AppConnConsensus. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAppConnConsensus(t mockConstructorTestingTNewAppConnConsensus) *AppConnConsensus {
func NewAppConnConsensus(t NewAppConnConsensusT) *AppConnConsensus {
mock := &AppConnConsensus{}
mock.Mock.Test(t)

View File

@@ -119,13 +119,13 @@ func (_m *AppConnMempool) SetResponseCallback(_a0 abciclient.Callback) {
_m.Called(_a0)
}
type mockConstructorTestingTNewAppConnMempool interface {
type NewAppConnMempoolT interface {
mock.TestingT
Cleanup(func())
}
// NewAppConnMempool creates a new instance of AppConnMempool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAppConnMempool(t mockConstructorTestingTNewAppConnMempool) *AppConnMempool {
func NewAppConnMempool(t NewAppConnMempoolT) *AppConnMempool {
mock := &AppConnMempool{}
mock.Mock.Test(t)

View File

@@ -98,13 +98,13 @@ func (_m *AppConnQuery) QuerySync(_a0 context.Context, _a1 types.RequestQuery) (
return r0, r1
}
type mockConstructorTestingTNewAppConnQuery interface {
type NewAppConnQueryT interface {
mock.TestingT
Cleanup(func())
}
// NewAppConnQuery creates a new instance of AppConnQuery. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAppConnQuery(t mockConstructorTestingTNewAppConnQuery) *AppConnQuery {
func NewAppConnQuery(t NewAppConnQueryT) *AppConnQuery {
mock := &AppConnQuery{}
mock.Mock.Test(t)

View File

@@ -121,13 +121,13 @@ func (_m *AppConnSnapshot) OfferSnapshotSync(_a0 context.Context, _a1 types.Requ
return r0, r1
}
type mockConstructorTestingTNewAppConnSnapshot interface {
type NewAppConnSnapshotT interface {
mock.TestingT
Cleanup(func())
}
// NewAppConnSnapshot creates a new instance of AppConnSnapshot. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAppConnSnapshot(t mockConstructorTestingTNewAppConnSnapshot) *AppConnSnapshot {
func NewAppConnSnapshot(t NewAppConnSnapshotT) *AppConnSnapshot {
mock := &AppConnSnapshot{}
mock.Mock.Test(t)

View File

@@ -166,13 +166,13 @@ func (_m *EventSink) Type() indexer.EventSinkType {
return r0
}
type mockConstructorTestingTNewEventSink interface {
type NewEventSinkT interface {
mock.TestingT
Cleanup(func())
}
// NewEventSink creates a new instance of EventSink. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewEventSink(t mockConstructorTestingTNewEventSink) *EventSink {
func NewEventSink(t NewEventSinkT) *EventSink {
mock := &EventSink{}
mock.Mock.Test(t)

View File

@@ -209,13 +209,13 @@ func (_m *BlockStore) Size() int64 {
return r0
}
type mockConstructorTestingTNewBlockStore interface {
type NewBlockStoreT interface {
mock.TestingT
Cleanup(func())
}
// NewBlockStore creates a new instance of BlockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewBlockStore(t mockConstructorTestingTNewBlockStore) *BlockStore {
func NewBlockStore(t NewBlockStoreT) *BlockStore {
mock := &BlockStore{}
mock.Mock.Test(t)

View File

@@ -69,13 +69,13 @@ func (_m *EvidencePool) Update(_a0 state.State, _a1 types.EvidenceList) {
_m.Called(_a0, _a1)
}
type mockConstructorTestingTNewEvidencePool interface {
type NewEvidencePoolT interface {
mock.TestingT
Cleanup(func())
}
// NewEvidencePool creates a new instance of EvidencePool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewEvidencePool(t mockConstructorTestingTNewEvidencePool) *EvidencePool {
func NewEvidencePool(t NewEvidencePoolT) *EvidencePool {
mock := &EvidencePool{}
mock.Mock.Test(t)

View File

@@ -187,13 +187,13 @@ func (_m *Store) SaveValidatorSets(_a0 int64, _a1 int64, _a2 *types.ValidatorSet
return r0
}
type mockConstructorTestingTNewStore interface {
type NewStoreT interface {
mock.TestingT
Cleanup(func())
}
// NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewStore(t mockConstructorTestingTNewStore) *Store {
func NewStore(t NewStoreT) *Store {
mock := &Store{}
mock.Mock.Test(t)

View File

@@ -83,13 +83,13 @@ func (_m *StateProvider) State(ctx context.Context, height uint64) (state.State,
return r0, r1
}
type mockConstructorTestingTNewStateProvider interface {
type NewStateProviderT interface {
mock.TestingT
Cleanup(func())
}
// NewStateProvider creates a new instance of StateProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewStateProvider(t mockConstructorTestingTNewStateProvider) *StateProvider {
func NewStateProvider(t NewStateProviderT) *StateProvider {
mock := &StateProvider{}
mock.Mock.Test(t)

View File

@@ -75,7 +75,7 @@ func MustNewDefaultLogger(format, level string, trace bool) Logger {
}
func (l defaultLogger) Info(msg string, keyVals ...interface{}) {
l.Logger.Info().Fields(keyVals).Msg(msg)
l.Logger.Info().Fields(getLogFields(keyVals...)).Msg(msg)
}
func (l defaultLogger) Error(msg string, keyVals ...interface{}) {
@@ -84,16 +84,29 @@ func (l defaultLogger) Error(msg string, keyVals ...interface{}) {
e = e.Stack()
}
e.Fields(keyVals).Msg(msg)
e.Fields(getLogFields(keyVals...)).Msg(msg)
}
func (l defaultLogger) Debug(msg string, keyVals ...interface{}) {
l.Logger.Debug().Fields(keyVals).Msg(msg)
l.Logger.Debug().Fields(getLogFields(keyVals...)).Msg(msg)
}
func (l defaultLogger) With(keyVals ...interface{}) Logger {
return defaultLogger{
Logger: l.Logger.With().Fields(keyVals).Logger(),
Logger: l.Logger.With().Fields(getLogFields(keyVals...)).Logger(),
trace: l.trace,
}
}
func getLogFields(keyVals ...interface{}) map[string]interface{} {
if len(keyVals)%2 != 0 {
return nil
}
fields := make(map[string]interface{}, len(keyVals))
for i := 0; i < len(keyVals); i += 2 {
fields[fmt.Sprint(keyVals[i])] = keyVals[i+1]
}
return fields
}

View File

@@ -52,13 +52,13 @@ func (_m *Provider) ReportEvidence(_a0 context.Context, _a1 types.Evidence) erro
return r0
}
type mockConstructorTestingTNewProvider interface {
type NewProviderT interface {
mock.TestingT
Cleanup(func())
}
// NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewProvider(t mockConstructorTestingTNewProvider) *Provider {
func NewProvider(t NewProviderT) *Provider {
mock := &Provider{}
mock.Mock.Test(t)

View File

@@ -100,13 +100,13 @@ func (_m *LightClient) VerifyLightBlockAtHeight(ctx context.Context, height int6
return r0, r1
}
type mockConstructorTestingTNewLightClient interface {
type NewLightClientT interface {
mock.TestingT
Cleanup(func())
}
// NewLightClient creates a new instance of LightClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewLightClient(t mockConstructorTestingTNewLightClient) *LightClient {
func NewLightClient(t NewLightClientT) *LightClient {
mock := &LightClient{}
mock.Mock.Test(t)

View File

@@ -250,7 +250,7 @@ func makeNode(cfg *config.Config,
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
blockSync := cfg.BlockSync.Enable && !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, logger, consensusLogger, cfg.Mode)
@@ -700,8 +700,10 @@ func (n *nodeImpl) OnStart() error {
}
if n.config.Mode != config.ModeSeed {
if err := n.bcReactor.Start(); err != nil {
return err
if n.config.BlockSync.Version == config.BlockSyncV0 {
if err := n.bcReactor.Start(); err != nil {
return err
}
}
// Start the real consensus reactor separately since the switch uses the shim.
@@ -785,18 +787,22 @@ func (n *nodeImpl) OnStart() error {
// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
// FIXME Very ugly to have these metrics bleed through here.
n.consensusReactor.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(state); err != nil {
n.Logger.Error("failed to switch to block sync", "err", err)
return
}
if n.config.BlockSync.Enable {
// FIXME Very ugly to have these metrics bleed through here.
n.consensusReactor.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(state); err != nil {
n.Logger.Error("failed to switch to block sync", "err", err)
return
}
s := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
if err := n.eventBus.PublishEventBlockSyncStatus(s); err != nil {
n.eventBus.Logger.Error("failed to emit the block sync starting event", "err", err)
}
d := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
if err := n.eventBus.PublishEventBlockSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the block sync starting event", "err", err)
}
} else {
n.consensusReactor.SwitchToConsensus(state, true)
}
}()
}
@@ -824,10 +830,11 @@ func (n *nodeImpl) OnStop() {
if n.config.Mode != config.ModeSeed {
// now stop the reactors
// Stop the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
if n.config.BlockSync.Version == config.BlockSyncV0 {
// Stop the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
}
}
// Stop the real consensus reactor separately since the switch uses the shim.

View File

@@ -17,6 +17,7 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
bcv0 "github.com/tendermint/tendermint/internal/blocksync/v0"
bcv2 "github.com/tendermint/tendermint/internal/blocksync/v2"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
@@ -338,10 +339,6 @@ func createBlockchainReactor(
metrics *consensus.Metrics,
) (*p2p.ReactorShim, service.Service, error) {
if !cfg.BlockSync.Enable {
logger.Error("blocksync.enable = false, but Tendermint no longer allows blocksync to be disabled. This setting is now ignored and will be removed in the next version.")
}
logger = logger.With("module", "blockchain")
switch cfg.BlockSync.Version {
@@ -445,23 +442,12 @@ func createConsensusReactor(
}
func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {
var maxAccepted uint32
switch {
case cfg.P2P.MaxConnections > 0 && !cfg.P2P.UseLegacy:
maxAccepted = uint32(cfg.P2P.MaxConnections) +
uint32(len(tmstrings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ")))
case cfg.P2P.MaxNumInboundPeers > 0:
maxAccepted = uint32(cfg.P2P.MaxNumInboundPeers) +
uint32(len(tmstrings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ")))
default:
maxAccepted = 0
}
return p2p.NewMConnTransport(
logger, p2p.MConnConfig(cfg.P2P), []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: maxAccepted,
MaxAcceptedConnections: uint32(cfg.P2P.MaxNumInboundPeers +
len(tmstrings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ")),
),
},
)
}
@@ -760,8 +746,10 @@ func makeNodeInfo(
switch cfg.BlockSync.Version {
case config.BlockSyncV0:
bcChannel = byte(bcv0.BlockSyncChannel)
case config.BlockSyncV2:
return types.NodeInfo{}, fmt.Errorf("unsupported blocksync version %s", cfg.BlockSync.Version)
bcChannel = bcv2.BlockchainChannel
default:
return types.NodeInfo{}, fmt.Errorf("unknown blocksync version %s", cfg.BlockSync.Version)
}

View File

@@ -801,13 +801,13 @@ func (_m *Client) Validators(ctx context.Context, height *int64, page *int, perP
return r0, r1
}
type mockConstructorTestingTNewClient interface {
type NewClientT interface {
mock.TestingT
Cleanup(func())
}
// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewClient(t mockConstructorTestingTNewClient) *Client {
func NewClient(t NewClientT) *Client {
mock := &Client{}
mock.Mock.Test(t)

View File

@@ -47,6 +47,8 @@ var (
"tcp": 20,
"unix": 10,
}
// FIXME: v2 disabled due to flake
nodeBlockSyncs = uniformChoice{"v0"} // "v2"
nodeMempools = uniformChoice{"v0", "v1"}
nodeStateSyncs = weightedChoice{
e2e.StateSyncDisabled: 10,
@@ -395,7 +397,7 @@ func generateNode(
StartAt: startAt,
Database: nodeDatabases.Choose(r),
PrivvalProtocol: nodePrivvalProtocols.Choose(r),
BlockSync: "v0",
BlockSync: nodeBlockSyncs.Choose(r).(string),
Mempool: nodeMempools.Choose(r).(string),
StateSync: e2e.StateSyncDisabled,
PersistInterval: ptrUint64(uint64(nodePersistIntervals.Choose(r).(int))),

View File

@@ -43,6 +43,7 @@ persist_interval = 0
perturb = ["restart"]
privval_protocol = "tcp"
seeds = ["seed01"]
block_sync = "v0"
[node.validator03]
database = "badgerdb"
@@ -51,6 +52,7 @@ abci_protocol = "grpc"
persist_interval = 3
perturb = ["kill"]
privval_protocol = "grpc"
block_sync = "v0"
retain_blocks = 10
[node.validator04]
@@ -59,9 +61,11 @@ snapshot_interval = 5
database = "rocksdb"
persistent_peers = ["validator01"]
perturb = ["pause"]
block_sync = "v0"
[node.validator05]
database = "cleveldb"
block_sync = "v0"
state_sync = "p2p"
seeds = ["seed01"]
start_at = 1005 # Becomes part of the validator set at 1010
@@ -72,6 +76,7 @@ privval_protocol = "tcp"
[node.full01]
mode = "full"
start_at = 1010
block_sync = "v0"
persistent_peers = ["validator01", "validator02", "validator03", "validator04"]
perturb = ["restart"]
retain_blocks = 10

View File

@@ -178,7 +178,7 @@ func LoadTestnet(file string) (*Testnet, error) {
ABCIProtocol: Protocol(testnet.ABCIProtocol),
PrivvalProtocol: ProtocolFile,
StartAt: nodeManifest.StartAt,
BlockSync: "v0",
BlockSync: nodeManifest.BlockSync,
Mempool: nodeManifest.Mempool,
StateSync: nodeManifest.StateSync,
PersistInterval: 1,