Compare commits

...

2 Commits

Author SHA1 Message Date
mergify[bot]
7fa14accff p2p: remove dial sleep and provide disconnect cooldown (backport #8839) (#8875)
(cherry picked from commit 52b6dc19ba)
2022-06-28 19:06:50 -04:00
William Banfield
af9bca5013 p2p: set empty timeouts to configed values. (manual backport of #8847) (#8869)
* regenerate mocks using newer style

* p2p: set empty timeouts to small values. (#8847)

These timeouts default to 'do not time out' if they are not set. This times up resources, potentially indefinitely. If node on the other side of the the handshake is up but unresponsive, the[ handshake call](edec79448a/internal/p2p/router.go (L720)) will _never_ return.

* fix light client select statement
2022-06-28 19:02:29 -04:00
34 changed files with 398 additions and 107 deletions

View File

@@ -801,3 +801,18 @@ func (_m *Client) String() string {
func (_m *Client) Wait() {
_m.Called()
}
type NewClientT interface {
mock.TestingT
Cleanup(func())
}
// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewClient(t NewClientT) *Client {
mock := &Client{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -26,3 +26,18 @@ func (_m *ConsSyncReactor) SetStateSyncingMetrics(_a0 float64) {
func (_m *ConsSyncReactor) SwitchToConsensus(_a0 state.State, _a1 bool) {
_m.Called(_a0, _a1)
}
type NewConsSyncReactorT interface {
mock.TestingT
Cleanup(func())
}
// NewConsSyncReactor creates a new instance of ConsSyncReactor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConsSyncReactor(t NewConsSyncReactorT) *ConsSyncReactor {
mock := &ConsSyncReactor{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -57,3 +57,18 @@ func (_m *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
return r0
}
type NewBlockStoreT interface {
mock.TestingT
Cleanup(func())
}
// NewBlockStore creates a new instance of BlockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewBlockStore(t NewBlockStoreT) *BlockStore {
mock := &BlockStore{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -13,6 +13,8 @@ import (
p2p "github.com/tendermint/tendermint/internal/p2p"
time "time"
types "github.com/tendermint/tendermint/types"
)
@@ -49,20 +51,20 @@ func (_m *Connection) FlushClose() error {
return r0
}
// Handshake provides a mock function with given fields: _a0, _a1, _a2
func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) {
ret := _m.Called(_a0, _a1, _a2)
// Handshake provides a mock function with given fields: _a0, _a1, _a2, _a3
func (_m *Connection) Handshake(_a0 context.Context, _a1 time.Duration, _a2 types.NodeInfo, _a3 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) {
ret := _m.Called(_a0, _a1, _a2, _a3)
var r0 types.NodeInfo
if rf, ok := ret.Get(0).(func(context.Context, types.NodeInfo, crypto.PrivKey) types.NodeInfo); ok {
r0 = rf(_a0, _a1, _a2)
if rf, ok := ret.Get(0).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) types.NodeInfo); ok {
r0 = rf(_a0, _a1, _a2, _a3)
} else {
r0 = ret.Get(0).(types.NodeInfo)
}
var r1 crypto.PubKey
if rf, ok := ret.Get(1).(func(context.Context, types.NodeInfo, crypto.PrivKey) crypto.PubKey); ok {
r1 = rf(_a0, _a1, _a2)
if rf, ok := ret.Get(1).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) crypto.PubKey); ok {
r1 = rf(_a0, _a1, _a2, _a3)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(crypto.PubKey)
@@ -70,8 +72,8 @@ func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 cry
}
var r2 error
if rf, ok := ret.Get(2).(func(context.Context, types.NodeInfo, crypto.PrivKey) error); ok {
r2 = rf(_a0, _a1, _a2)
if rf, ok := ret.Get(2).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) error); ok {
r2 = rf(_a0, _a1, _a2, _a3)
} else {
r2 = ret.Error(2)
}
@@ -206,3 +208,18 @@ func (_m *Connection) TrySendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error
return r0, r1
}
type NewConnectionT interface {
mock.TestingT
Cleanup(func())
}
// NewConnection creates a new instance of Connection. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConnection(t NewConnectionT) *Connection {
mock := &Connection{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -332,3 +332,18 @@ func (_m *Peer) TrySend(_a0 byte, _a1 []byte) bool {
func (_m *Peer) Wait() {
_m.Called()
}
type NewPeerT interface {
mock.TestingT
Cleanup(func())
}
// NewPeer creates a new instance of Peer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewPeer(t NewPeerT) *Peer {
mock := &Peer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -119,3 +119,18 @@ func (_m *Transport) String() string {
return r0
}
type NewTransportT interface {
mock.TestingT
Cleanup(func())
}
// NewTransport creates a new instance of Transport. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewTransport(t NewTransportT) *Transport {
mock := &Transport{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -1,7 +1,6 @@
package p2ptest
import (
"context"
"math/rand"
"testing"
"time"
@@ -254,7 +253,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
privKey,
peerManager,
[]p2p.Transport{transport},
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())

View File

@@ -90,7 +90,7 @@ func createOutboundPeerAndPerformHandshake(
if err != nil {
return nil, err
}
peerInfo, _, err := pc.conn.Handshake(context.Background(), ourNodeInfo, pk)
peerInfo, _, err := pc.conn.Handshake(context.Background(), 0, ourNodeInfo, pk)
if err != nil {
return nil, err
}
@@ -187,7 +187,7 @@ func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) {
if err != nil {
return nil, err
}
_, _, err = pc.conn.Handshake(context.Background(), rp.nodeInfo(), rp.PrivKey)
_, _, err = pc.conn.Handshake(context.Background(), 0, rp.nodeInfo(), rp.PrivKey)
if err != nil {
return nil, err
}
@@ -213,7 +213,7 @@ func (rp *remotePeer) accept() {
if err != nil {
golog.Printf("Failed to create a peer: %+v", err)
}
_, _, err = pc.conn.Handshake(context.Background(), rp.nodeInfo(), rp.PrivKey)
_, _, err = pc.conn.Handshake(context.Background(), 0, rp.nodeInfo(), rp.PrivKey)
if err != nil {
golog.Printf("Failed to handshake a peer: %+v", err)
}

View File

@@ -162,6 +162,10 @@ type PeerManagerOptions struct {
// retry times, to avoid thundering herds. 0 disables jitter.
RetryTimeJitter time.Duration
// DisconnectCooldownPeriod is the amount of time after we
// disconnect from a peer before we'll consider dialing a new peer
DisconnectCooldownPeriod time.Duration
// PeerScores sets fixed scores for specific peers. It is mainly used
// for testing. A score of 0 is ignored.
PeerScores map[types.NodeID]PeerScore
@@ -570,6 +574,10 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) {
continue
}
if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod {
continue
}
for _, addressInfo := range peer.AddressInfo {
if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) {
continue
@@ -885,6 +893,22 @@ func (m *PeerManager) Disconnected(peerID types.NodeID) {
delete(m.evicting, peerID)
delete(m.ready, peerID)
if peer, ok := m.store.Get(peerID); ok {
peer.LastDisconnected = time.Now()
_ = m.store.Set(peer)
// launch a thread to ping the dialWaker when the
// disconnected peer can be dialed again.
go func() {
timer := time.NewTimer(m.options.DisconnectCooldownPeriod)
defer timer.Stop()
select {
case <-timer.C:
m.dialWaker.Wake()
case <-m.closeCh:
}
}()
}
if ready {
m.broadcast(PeerUpdate{
NodeID: peerID,
@@ -1447,9 +1471,10 @@ func (s *peerStore) Size() int {
// peerInfo contains peer information stored in a peerStore.
type peerInfo struct {
ID types.NodeID
AddressInfo map[NodeAddress]*peerAddressInfo
LastConnected time.Time
ID types.NodeID
AddressInfo map[NodeAddress]*peerAddressInfo
LastConnected time.Time
LastDisconnected time.Time
// These fields are ephemeral, i.e. not persisted to the database.
Persistent bool
@@ -1489,8 +1514,8 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) {
func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
msg := &p2pproto.PeerInfo{
ID: string(p.ID),
LastConnected: &p.LastConnected,
Inactive: p.Inactive,
LastConnected: &p.LastConnected,
}
for _, addressInfo := range p.AddressInfo {
msg.AddressInfo = append(msg.AddressInfo, addressInfo.ToProto())
@@ -1498,6 +1523,7 @@ func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
if msg.LastConnected.IsZero() {
msg.LastConnected = nil
}
return msg
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net"
"runtime"
"sync"
@@ -160,12 +159,6 @@ type RouterOptions struct {
// return an error to reject the peer.
FilterPeerByID func(context.Context, types.NodeID) error
// DialSleep controls the amount of time that the router
// sleeps between dialing peers. If not set, a default value
// is used that sleeps for a (random) amount of time up to 3
// seconds between submitting each peer to be dialed.
DialSleep func(context.Context)
// NumConcrruentDials controls how many parallel go routines
// are used to dial peers. This defaults to the value of
// runtime.NumCPU.
@@ -552,30 +545,6 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error {
return r.options.FilterPeerByID(ctx, id)
}
func (r *Router) dialSleep(ctx context.Context) {
if r.options.DialSleep == nil {
const (
maxDialerInterval = 500
minDialerInterval = 100
)
// nolint:gosec // G404: Use of weak random number generator
dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval)
timer := time.NewTimer(dur * time.Millisecond)
defer timer.Stop()
select {
case <-ctx.Done():
case <-timer.C:
}
return
}
r.options.DialSleep(ctx)
}
// acceptPeers accepts inbound connections from peers on the given transport,
// and spawns goroutines that route messages to/from them.
func (r *Router) acceptPeers(transport Transport) {
@@ -704,12 +673,7 @@ LOOP:
select {
case addresses <- address:
// this jitters the frequency that we call
// DialNext and prevents us from attempting to
// create connections too quickly.
r.dialSleep(ctx)
continue
continue LOOP
case <-ctx.Done():
close(addresses)
break LOOP
@@ -829,13 +793,7 @@ func (r *Router) handshakePeer(
expectID types.NodeID,
) (types.NodeInfo, crypto.PubKey, error) {
if r.options.HandshakeTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.options.HandshakeTimeout)
defer cancel()
}
peerInfo, peerKey, err := conn.Handshake(ctx, r.nodeInfo, r.privKey)
peerInfo, peerKey, err := conn.Handshake(ctx, r.options.HandshakeTimeout, r.nodeInfo, r.privKey)
if err != nil {
return peerInfo, peerKey, err
}

View File

@@ -1,7 +1,6 @@
package p2p_test
import (
"context"
"errors"
"fmt"
"io"
@@ -352,7 +351,7 @@ func TestRouter_AcceptPeers(t *testing.T) {
closer := tmsync.NewCloser()
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
Return(tc.peerInfo, tc.peerKey, nil)
mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil)
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
@@ -462,7 +461,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF)
mockConnection.On("Close").Return(nil)
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
@@ -543,7 +542,7 @@ func TestRouter_DialPeers(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
if tc.dialErr == nil {
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
Return(tc.peerInfo, tc.peerKey, nil)
mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil)
}
@@ -630,7 +629,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF)
mockConnection.On("Close").Return(nil)
@@ -671,7 +670,6 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
peerManager,
[]p2p.Transport{mockTransport},
p2p.RouterOptions{
DialSleep: func(_ context.Context) {},
NumConcurrentDials: func() int {
ncpu := runtime.NumCPU()
if ncpu <= 3 {
@@ -710,7 +708,7 @@ func TestRouter_EvictPeers(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
Return(peerInfo, peerKey.PubKey(), nil)
mockConnection.On("ReceiveMessage").WaitUntil(closeCh).Return(chID, nil, io.EOF)
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
@@ -779,7 +777,7 @@ func TestRouter_ChannelCompatability(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
Return(incompatiblePeer, peerKey.PubKey(), nil)
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
mockConnection.On("Close").Return(nil)
@@ -828,7 +826,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
Return(peer, peerKey.PubKey(), nil)
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
mockConnection.On("Close").Return(nil)

View File

@@ -865,11 +865,11 @@ func (sw *Switch) handshakePeer(
c Connection,
expectPeerID types.NodeID,
) (types.NodeInfo, crypto.PubKey, error) {
// Moved from transport and hardcoded until legacy P2P stack removal.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerInfo, peerKey, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
// Moved timeout from transport and hardcoded until legacy P2P stack removal.
peerInfo, peerKey, err := c.Handshake(ctx, 5*time.Second, sw.nodeInfo, sw.nodeKey.PrivKey)
if err != nil {
return peerInfo, peerKey, ErrRejected{
conn: c.(*mConnConnection).conn,

View File

@@ -267,7 +267,7 @@ func TestSwitchPeerFilter(t *testing.T) {
if err != nil {
t.Fatal(err)
}
peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
if err != nil {
t.Fatal(err)
}
@@ -324,7 +324,7 @@ func TestSwitchPeerFilterTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
if err != nil {
t.Fatal(err)
}
@@ -360,7 +360,7 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
if err != nil {
t.Fatal(err)
}
@@ -415,7 +415,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
if err != nil {
t.Fatal(err)
}
peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
if err != nil {
t.Fatal(err)
}

View File

@@ -126,7 +126,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
}
return err
}
peerNodeInfo, _, err := pc.conn.Handshake(context.Background(), sw.nodeInfo, sw.nodeKey.PrivKey)
peerNodeInfo, _, err := pc.conn.Handshake(context.Background(), 0, sw.nodeInfo, sw.nodeKey.PrivKey)
if err != nil {
if err := conn.Close(); err != nil {
sw.Logger.Error("Error closing connection", "err", err)

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"time"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/p2p/conn"
@@ -84,7 +85,7 @@ type Connection interface {
// FIXME: The handshake should really be the Router's responsibility, but
// that requires the connection interface to be byte-oriented rather than
// message-oriented (see comment above).
Handshake(context.Context, types.NodeInfo, crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error)
Handshake(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error)
// ReceiveMessage returns the next message received on the connection,
// blocking until one is available. Returns io.EOF if closed.

View File

@@ -9,6 +9,7 @@ import (
"net"
"strconv"
"sync"
"time"
"golang.org/x/net/netutil"
@@ -255,6 +256,7 @@ func newMConnConnection(
// Handshake implements Connection.
func (c *mConnConnection) Handshake(
ctx context.Context,
timeout time.Duration,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
) (types.NodeInfo, crypto.PubKey, error) {
@@ -264,6 +266,12 @@ func (c *mConnConnection) Handshake(
peerKey crypto.PubKey
errCh = make(chan error, 1)
)
handshakeCtx := ctx
if timeout > 0 {
var cancel context.CancelFunc
handshakeCtx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// To handle context cancellation, we need to do the handshake in a
// goroutine and abort the blocking network calls by closing the connection
// when the context is canceled.
@@ -276,12 +284,17 @@ func (c *mConnConnection) Handshake(
}
}()
var err error
mconn, peerInfo, peerKey, err = c.handshake(ctx, nodeInfo, privKey)
errCh <- err
mconn, peerInfo, peerKey, err = c.handshake(handshakeCtx, nodeInfo, privKey)
select {
case errCh <- err:
case <-handshakeCtx.Done():
}
}()
select {
case <-ctx.Done():
case <-handshakeCtx.Done():
_ = c.Close()
return types.NodeInfo{}, nil, ctx.Err()

View File

@@ -7,6 +7,7 @@ import (
"io"
"net"
"sync"
"time"
"github.com/tendermint/tendermint/crypto"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
@@ -270,9 +271,16 @@ func (c *MemoryConnection) Status() conn.ConnectionStatus {
// Handshake implements Connection.
func (c *MemoryConnection) Handshake(
ctx context.Context,
timeout time.Duration,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
) (types.NodeInfo, crypto.PubKey, error) {
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
select {
case c.sendCh <- memoryMessage{nodeInfo: &nodeInfo, pubKey: privKey.PubKey()}:
c.logger.Debug("sent handshake", "nodeInfo", nodeInfo)

View File

@@ -265,7 +265,7 @@ func TestConnection_Handshake(t *testing.T) {
errCh := make(chan error, 1)
go func() {
// Must use assert due to goroutine.
peerInfo, peerKey, err := ba.Handshake(ctx, bInfo, bKey)
peerInfo, peerKey, err := ba.Handshake(ctx, 0, bInfo, bKey)
if err == nil {
assert.Equal(t, aInfo, peerInfo)
assert.Equal(t, aKey.PubKey(), peerKey)
@@ -273,7 +273,7 @@ func TestConnection_Handshake(t *testing.T) {
errCh <- err
}()
peerInfo, peerKey, err := ab.Handshake(ctx, aInfo, aKey)
peerInfo, peerKey, err := ab.Handshake(ctx, 0, aInfo, aKey)
require.NoError(t, err)
require.Equal(t, bInfo, peerInfo)
require.Equal(t, bKey.PubKey(), peerKey)
@@ -291,7 +291,7 @@ func TestConnection_HandshakeCancel(t *testing.T) {
ab, ba := dialAccept(t, a, b)
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
cancel()
_, _, err := ab.Handshake(timeoutCtx, types.NodeInfo{}, ed25519.GenPrivKey())
_, _, err := ab.Handshake(timeoutCtx, 0, types.NodeInfo{}, ed25519.GenPrivKey())
require.Error(t, err)
require.Equal(t, context.Canceled, err)
_ = ab.Close()
@@ -301,7 +301,7 @@ func TestConnection_HandshakeCancel(t *testing.T) {
ab, ba = dialAccept(t, a, b)
timeoutCtx, cancel = context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel()
_, _, err = ab.Handshake(timeoutCtx, types.NodeInfo{}, ed25519.GenPrivKey())
_, _, err = ab.Handshake(timeoutCtx, 0, types.NodeInfo{}, ed25519.GenPrivKey())
require.Error(t, err)
require.Equal(t, context.DeadlineExceeded, err)
_ = ab.Close()
@@ -630,13 +630,13 @@ func dialAcceptHandshake(t *testing.T, a, b p2p.Transport) (p2p.Connection, p2p.
go func() {
privKey := ed25519.GenPrivKey()
nodeInfo := types.NodeInfo{NodeID: types.NodeIDFromPubKey(privKey.PubKey())}
_, _, err := ba.Handshake(ctx, nodeInfo, privKey)
_, _, err := ba.Handshake(ctx, 0, nodeInfo, privKey)
errCh <- err
}()
privKey := ed25519.GenPrivKey()
nodeInfo := types.NodeInfo{NodeID: types.NodeIDFromPubKey(privKey.PubKey())}
_, _, err := ab.Handshake(ctx, nodeInfo, privKey)
_, _, err := ab.Handshake(ctx, 0, nodeInfo, privKey)
require.NoError(t, err)
timer := time.NewTimer(2 * time.Second)

View File

@@ -150,3 +150,18 @@ func (_m *AppConnConsensus) InitChainSync(_a0 context.Context, _a1 types.Request
func (_m *AppConnConsensus) SetResponseCallback(_a0 abciclient.Callback) {
_m.Called(_a0)
}
type NewAppConnConsensusT interface {
mock.TestingT
Cleanup(func())
}
// NewAppConnConsensus creates a new instance of AppConnConsensus. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAppConnConsensus(t NewAppConnConsensusT) *AppConnConsensus {
mock := &AppConnConsensus{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -118,3 +118,18 @@ func (_m *AppConnMempool) FlushSync(_a0 context.Context) error {
func (_m *AppConnMempool) SetResponseCallback(_a0 abciclient.Callback) {
_m.Called(_a0)
}
type NewAppConnMempoolT interface {
mock.TestingT
Cleanup(func())
}
// NewAppConnMempool creates a new instance of AppConnMempool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAppConnMempool(t NewAppConnMempoolT) *AppConnMempool {
mock := &AppConnMempool{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -97,3 +97,18 @@ func (_m *AppConnQuery) QuerySync(_a0 context.Context, _a1 types.RequestQuery) (
return r0, r1
}
type NewAppConnQueryT interface {
mock.TestingT
Cleanup(func())
}
// NewAppConnQuery creates a new instance of AppConnQuery. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAppConnQuery(t NewAppConnQueryT) *AppConnQuery {
mock := &AppConnQuery{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -120,3 +120,18 @@ func (_m *AppConnSnapshot) OfferSnapshotSync(_a0 context.Context, _a1 types.Requ
return r0, r1
}
type NewAppConnSnapshotT interface {
mock.TestingT
Cleanup(func())
}
// NewAppConnSnapshot creates a new instance of AppConnSnapshot. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewAppConnSnapshot(t NewAppConnSnapshotT) *AppConnSnapshot {
mock := &AppConnSnapshot{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -165,3 +165,18 @@ func (_m *EventSink) Type() indexer.EventSinkType {
return r0
}
type NewEventSinkT interface {
mock.TestingT
Cleanup(func())
}
// NewEventSink creates a new instance of EventSink. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewEventSink(t NewEventSinkT) *EventSink {
mock := &EventSink{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -208,3 +208,18 @@ func (_m *BlockStore) Size() int64 {
return r0
}
type NewBlockStoreT interface {
mock.TestingT
Cleanup(func())
}
// NewBlockStore creates a new instance of BlockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewBlockStore(t NewBlockStoreT) *BlockStore {
mock := &BlockStore{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -68,3 +68,18 @@ func (_m *EvidencePool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64
func (_m *EvidencePool) Update(_a0 state.State, _a1 types.EvidenceList) {
_m.Called(_a0, _a1)
}
type NewEvidencePoolT interface {
mock.TestingT
Cleanup(func())
}
// NewEvidencePool creates a new instance of EvidencePool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewEvidencePool(t NewEvidencePoolT) *EvidencePool {
mock := &EvidencePool{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -186,3 +186,18 @@ func (_m *Store) SaveValidatorSets(_a0 int64, _a1 int64, _a2 *types.ValidatorSet
return r0
}
type NewStoreT interface {
mock.TestingT
Cleanup(func())
}
// NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewStore(t NewStoreT) *Store {
mock := &Store{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -82,3 +82,18 @@ func (_m *StateProvider) State(ctx context.Context, height uint64) (state.State,
return r0, r1
}
type NewStateProviderT interface {
mock.TestingT
Cleanup(func())
}
// NewStateProvider creates a new instance of StateProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewStateProvider(t NewStateProviderT) *StateProvider {
mock := &StateProvider{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -1018,7 +1018,12 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
// process all the responses as they come in
for i := 0; i < cap(witnessResponsesC); i++ {
response := <-witnessResponsesC
var response witnessResponse
select {
case response = <-witnessResponsesC:
case <-ctx.Done():
return nil, ctx.Err()
}
switch response.err {
// success! We have found a new primary
case nil:
@@ -1047,10 +1052,6 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
// return the light block that new primary responded with
return response.lb, nil
// catch canceled contexts or deadlines
case context.Canceled, context.DeadlineExceeded:
return nil, response.err
// process benign errors by logging them only
case provider.ErrNoResponse, provider.ErrLightBlockNotFound, provider.ErrHeightTooHigh:
lastError = response.err

View File

@@ -51,3 +51,18 @@ func (_m *Provider) ReportEvidence(_a0 context.Context, _a1 types.Evidence) erro
return r0
}
type NewProviderT interface {
mock.TestingT
Cleanup(func())
}
// NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewProvider(t NewProviderT) *Provider {
mock := &Provider{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -99,3 +99,18 @@ func (_m *LightClient) VerifyLightBlockAtHeight(ctx context.Context, height int6
return r0, r1
}
type NewLightClientT interface {
mock.TestingT
Cleanup(func())
}
// NewLightClient creates a new instance of LightClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewLightClient(t NewLightClientT) *LightClient {
mock := &LightClient{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -1246,7 +1246,9 @@ func createAndStartPrivValidatorGRPCClient(
func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
opts := p2p.RouterOptions{
QueueType: conf.P2P.QueueType,
QueueType: conf.P2P.QueueType,
HandshakeTimeout: conf.P2P.HandshakeTimeout,
DialTimeout: conf.P2P.DialTimeout,
}
if conf.P2P.MaxNumInboundPeers > 0 {

View File

@@ -504,17 +504,18 @@ func createPeerManager(
const maxUpgradeConns = 4
options := p2p.PeerManagerOptions{
SelfAddress: selfAddr,
MaxConnected: maxConns,
MaxOutgoingConnections: maxOutgoingConns,
MaxConnectedUpgrade: maxUpgradeConns,
MaxPeers: maxUpgradeConns + 4*maxConns,
MinRetryTime: 250 * time.Millisecond,
MaxRetryTime: 30 * time.Minute,
MaxRetryTimePersistent: 5 * time.Minute,
RetryTimeJitter: 5 * time.Second,
PrivatePeers: privatePeerIDs,
Metrics: metrics,
SelfAddress: selfAddr,
MaxConnected: maxConns,
MaxOutgoingConnections: maxOutgoingConns,
MaxConnectedUpgrade: maxUpgradeConns,
DisconnectCooldownPeriod: 2 * time.Second,
MaxPeers: maxUpgradeConns + 4*maxConns,
MinRetryTime: 250 * time.Millisecond,
MaxRetryTime: 30 * time.Minute,
MaxRetryTimePersistent: 5 * time.Minute,
RetryTimeJitter: 5 * time.Second,
PrivatePeers: privatePeerIDs,
Metrics: metrics,
}
peers := []p2p.NodeAddress{}

View File

@@ -800,3 +800,18 @@ func (_m *Client) Validators(ctx context.Context, height *int64, page *int, perP
return r0, r1
}
type NewClientT interface {
mock.TestingT
Cleanup(func())
}
// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewClient(t NewClientT) *Client {
mock := &Client{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -1,3 +1,15 @@
#!/bin/sh
#
# Invoke Mockery v2 to update generated mocks for the given type.
#
# This script runs a locally-installed "mockery" if available, otherwise it
# runs the published Docker container. This legerdemain is so that the CI build
# and a local build can work off the same script.
#
if ! which mockery ; then
mockery() {
docker run --rm -v "$PWD":/w --workdir=/w vektra/mockery:v2.12.3
}
fi
go run github.com/vektra/mockery/v2 --disable-version-string --case underscore --name $*
mockery --disable-version-string --case underscore --name "$@"