mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-12 07:42:48 +00:00
Compare commits
2 Commits
v035-mempo
...
wb/experim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7fa14accff | ||
|
|
af9bca5013 |
@@ -801,3 +801,18 @@ func (_m *Client) String() string {
|
||||
func (_m *Client) Wait() {
|
||||
_m.Called()
|
||||
}
|
||||
|
||||
type NewClientT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewClient(t NewClientT) *Client {
|
||||
mock := &Client{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -26,3 +26,18 @@ func (_m *ConsSyncReactor) SetStateSyncingMetrics(_a0 float64) {
|
||||
func (_m *ConsSyncReactor) SwitchToConsensus(_a0 state.State, _a1 bool) {
|
||||
_m.Called(_a0, _a1)
|
||||
}
|
||||
|
||||
type NewConsSyncReactorT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewConsSyncReactor creates a new instance of ConsSyncReactor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewConsSyncReactor(t NewConsSyncReactorT) *ConsSyncReactor {
|
||||
mock := &ConsSyncReactor{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -57,3 +57,18 @@ func (_m *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
type NewBlockStoreT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewBlockStore creates a new instance of BlockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewBlockStore(t NewBlockStoreT) *BlockStore {
|
||||
mock := &BlockStore{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
|
||||
p2p "github.com/tendermint/tendermint/internal/p2p"
|
||||
|
||||
time "time"
|
||||
|
||||
types "github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -49,20 +51,20 @@ func (_m *Connection) FlushClose() error {
|
||||
return r0
|
||||
}
|
||||
|
||||
// Handshake provides a mock function with given fields: _a0, _a1, _a2
|
||||
func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) {
|
||||
ret := _m.Called(_a0, _a1, _a2)
|
||||
// Handshake provides a mock function with given fields: _a0, _a1, _a2, _a3
|
||||
func (_m *Connection) Handshake(_a0 context.Context, _a1 time.Duration, _a2 types.NodeInfo, _a3 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) {
|
||||
ret := _m.Called(_a0, _a1, _a2, _a3)
|
||||
|
||||
var r0 types.NodeInfo
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.NodeInfo, crypto.PrivKey) types.NodeInfo); ok {
|
||||
r0 = rf(_a0, _a1, _a2)
|
||||
if rf, ok := ret.Get(0).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) types.NodeInfo); ok {
|
||||
r0 = rf(_a0, _a1, _a2, _a3)
|
||||
} else {
|
||||
r0 = ret.Get(0).(types.NodeInfo)
|
||||
}
|
||||
|
||||
var r1 crypto.PubKey
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.NodeInfo, crypto.PrivKey) crypto.PubKey); ok {
|
||||
r1 = rf(_a0, _a1, _a2)
|
||||
if rf, ok := ret.Get(1).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) crypto.PubKey); ok {
|
||||
r1 = rf(_a0, _a1, _a2, _a3)
|
||||
} else {
|
||||
if ret.Get(1) != nil {
|
||||
r1 = ret.Get(1).(crypto.PubKey)
|
||||
@@ -70,8 +72,8 @@ func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 cry
|
||||
}
|
||||
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(2).(func(context.Context, types.NodeInfo, crypto.PrivKey) error); ok {
|
||||
r2 = rf(_a0, _a1, _a2)
|
||||
if rf, ok := ret.Get(2).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) error); ok {
|
||||
r2 = rf(_a0, _a1, _a2, _a3)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
@@ -206,3 +208,18 @@ func (_m *Connection) TrySendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
type NewConnectionT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewConnection creates a new instance of Connection. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewConnection(t NewConnectionT) *Connection {
|
||||
mock := &Connection{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -332,3 +332,18 @@ func (_m *Peer) TrySend(_a0 byte, _a1 []byte) bool {
|
||||
func (_m *Peer) Wait() {
|
||||
_m.Called()
|
||||
}
|
||||
|
||||
type NewPeerT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewPeer creates a new instance of Peer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewPeer(t NewPeerT) *Peer {
|
||||
mock := &Peer{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -119,3 +119,18 @@ func (_m *Transport) String() string {
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
type NewTransportT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewTransport creates a new instance of Transport. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewTransport(t NewTransportT) *Transport {
|
||||
mock := &Transport{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package p2ptest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -254,7 +253,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
|
||||
privKey,
|
||||
peerManager,
|
||||
[]p2p.Transport{transport},
|
||||
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},
|
||||
p2p.RouterOptions{},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, router.Start())
|
||||
|
||||
@@ -90,7 +90,7 @@ func createOutboundPeerAndPerformHandshake(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
peerInfo, _, err := pc.conn.Handshake(context.Background(), ourNodeInfo, pk)
|
||||
peerInfo, _, err := pc.conn.Handshake(context.Background(), 0, ourNodeInfo, pk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -187,7 +187,7 @@ func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, _, err = pc.conn.Handshake(context.Background(), rp.nodeInfo(), rp.PrivKey)
|
||||
_, _, err = pc.conn.Handshake(context.Background(), 0, rp.nodeInfo(), rp.PrivKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -213,7 +213,7 @@ func (rp *remotePeer) accept() {
|
||||
if err != nil {
|
||||
golog.Printf("Failed to create a peer: %+v", err)
|
||||
}
|
||||
_, _, err = pc.conn.Handshake(context.Background(), rp.nodeInfo(), rp.PrivKey)
|
||||
_, _, err = pc.conn.Handshake(context.Background(), 0, rp.nodeInfo(), rp.PrivKey)
|
||||
if err != nil {
|
||||
golog.Printf("Failed to handshake a peer: %+v", err)
|
||||
}
|
||||
|
||||
@@ -162,6 +162,10 @@ type PeerManagerOptions struct {
|
||||
// retry times, to avoid thundering herds. 0 disables jitter.
|
||||
RetryTimeJitter time.Duration
|
||||
|
||||
// DisconnectCooldownPeriod is the amount of time after we
|
||||
// disconnect from a peer before we'll consider dialing a new peer
|
||||
DisconnectCooldownPeriod time.Duration
|
||||
|
||||
// PeerScores sets fixed scores for specific peers. It is mainly used
|
||||
// for testing. A score of 0 is ignored.
|
||||
PeerScores map[types.NodeID]PeerScore
|
||||
@@ -570,6 +574,10 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, addressInfo := range peer.AddressInfo {
|
||||
if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) {
|
||||
continue
|
||||
@@ -885,6 +893,22 @@ func (m *PeerManager) Disconnected(peerID types.NodeID) {
|
||||
delete(m.evicting, peerID)
|
||||
delete(m.ready, peerID)
|
||||
|
||||
if peer, ok := m.store.Get(peerID); ok {
|
||||
peer.LastDisconnected = time.Now()
|
||||
_ = m.store.Set(peer)
|
||||
// launch a thread to ping the dialWaker when the
|
||||
// disconnected peer can be dialed again.
|
||||
go func() {
|
||||
timer := time.NewTimer(m.options.DisconnectCooldownPeriod)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
m.dialWaker.Wake()
|
||||
case <-m.closeCh:
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if ready {
|
||||
m.broadcast(PeerUpdate{
|
||||
NodeID: peerID,
|
||||
@@ -1447,9 +1471,10 @@ func (s *peerStore) Size() int {
|
||||
|
||||
// peerInfo contains peer information stored in a peerStore.
|
||||
type peerInfo struct {
|
||||
ID types.NodeID
|
||||
AddressInfo map[NodeAddress]*peerAddressInfo
|
||||
LastConnected time.Time
|
||||
ID types.NodeID
|
||||
AddressInfo map[NodeAddress]*peerAddressInfo
|
||||
LastConnected time.Time
|
||||
LastDisconnected time.Time
|
||||
|
||||
// These fields are ephemeral, i.e. not persisted to the database.
|
||||
Persistent bool
|
||||
@@ -1489,8 +1514,8 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) {
|
||||
func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
|
||||
msg := &p2pproto.PeerInfo{
|
||||
ID: string(p.ID),
|
||||
LastConnected: &p.LastConnected,
|
||||
Inactive: p.Inactive,
|
||||
LastConnected: &p.LastConnected,
|
||||
}
|
||||
for _, addressInfo := range p.AddressInfo {
|
||||
msg.AddressInfo = append(msg.AddressInfo, addressInfo.ToProto())
|
||||
@@ -1498,6 +1523,7 @@ func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
|
||||
if msg.LastConnected.IsZero() {
|
||||
msg.LastConnected = nil
|
||||
}
|
||||
|
||||
return msg
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
"runtime"
|
||||
"sync"
|
||||
@@ -160,12 +159,6 @@ type RouterOptions struct {
|
||||
// return an error to reject the peer.
|
||||
FilterPeerByID func(context.Context, types.NodeID) error
|
||||
|
||||
// DialSleep controls the amount of time that the router
|
||||
// sleeps between dialing peers. If not set, a default value
|
||||
// is used that sleeps for a (random) amount of time up to 3
|
||||
// seconds between submitting each peer to be dialed.
|
||||
DialSleep func(context.Context)
|
||||
|
||||
// NumConcrruentDials controls how many parallel go routines
|
||||
// are used to dial peers. This defaults to the value of
|
||||
// runtime.NumCPU.
|
||||
@@ -552,30 +545,6 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error {
|
||||
return r.options.FilterPeerByID(ctx, id)
|
||||
}
|
||||
|
||||
func (r *Router) dialSleep(ctx context.Context) {
|
||||
if r.options.DialSleep == nil {
|
||||
const (
|
||||
maxDialerInterval = 500
|
||||
minDialerInterval = 100
|
||||
)
|
||||
|
||||
// nolint:gosec // G404: Use of weak random number generator
|
||||
dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval)
|
||||
|
||||
timer := time.NewTimer(dur * time.Millisecond)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-timer.C:
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
r.options.DialSleep(ctx)
|
||||
}
|
||||
|
||||
// acceptPeers accepts inbound connections from peers on the given transport,
|
||||
// and spawns goroutines that route messages to/from them.
|
||||
func (r *Router) acceptPeers(transport Transport) {
|
||||
@@ -704,12 +673,7 @@ LOOP:
|
||||
|
||||
select {
|
||||
case addresses <- address:
|
||||
// this jitters the frequency that we call
|
||||
// DialNext and prevents us from attempting to
|
||||
// create connections too quickly.
|
||||
|
||||
r.dialSleep(ctx)
|
||||
continue
|
||||
continue LOOP
|
||||
case <-ctx.Done():
|
||||
close(addresses)
|
||||
break LOOP
|
||||
@@ -829,13 +793,7 @@ func (r *Router) handshakePeer(
|
||||
expectID types.NodeID,
|
||||
) (types.NodeInfo, crypto.PubKey, error) {
|
||||
|
||||
if r.options.HandshakeTimeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, r.options.HandshakeTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
peerInfo, peerKey, err := conn.Handshake(ctx, r.nodeInfo, r.privKey)
|
||||
peerInfo, peerKey, err := conn.Handshake(ctx, r.options.HandshakeTimeout, r.nodeInfo, r.privKey)
|
||||
if err != nil {
|
||||
return peerInfo, peerKey, err
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package p2p_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -352,7 +351,7 @@ func TestRouter_AcceptPeers(t *testing.T) {
|
||||
closer := tmsync.NewCloser()
|
||||
mockConnection := &mocks.Connection{}
|
||||
mockConnection.On("String").Maybe().Return("mock")
|
||||
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
|
||||
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
|
||||
Return(tc.peerInfo, tc.peerKey, nil)
|
||||
mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil)
|
||||
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
|
||||
@@ -462,7 +461,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
|
||||
|
||||
mockConnection := &mocks.Connection{}
|
||||
mockConnection.On("String").Maybe().Return("mock")
|
||||
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
|
||||
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
|
||||
WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF)
|
||||
mockConnection.On("Close").Return(nil)
|
||||
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
|
||||
@@ -543,7 +542,7 @@ func TestRouter_DialPeers(t *testing.T) {
|
||||
mockConnection := &mocks.Connection{}
|
||||
mockConnection.On("String").Maybe().Return("mock")
|
||||
if tc.dialErr == nil {
|
||||
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
|
||||
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
|
||||
Return(tc.peerInfo, tc.peerKey, nil)
|
||||
mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil)
|
||||
}
|
||||
@@ -630,7 +629,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
|
||||
|
||||
mockConnection := &mocks.Connection{}
|
||||
mockConnection.On("String").Maybe().Return("mock")
|
||||
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
|
||||
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
|
||||
WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF)
|
||||
mockConnection.On("Close").Return(nil)
|
||||
|
||||
@@ -671,7 +670,6 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
|
||||
peerManager,
|
||||
[]p2p.Transport{mockTransport},
|
||||
p2p.RouterOptions{
|
||||
DialSleep: func(_ context.Context) {},
|
||||
NumConcurrentDials: func() int {
|
||||
ncpu := runtime.NumCPU()
|
||||
if ncpu <= 3 {
|
||||
@@ -710,7 +708,7 @@ func TestRouter_EvictPeers(t *testing.T) {
|
||||
|
||||
mockConnection := &mocks.Connection{}
|
||||
mockConnection.On("String").Maybe().Return("mock")
|
||||
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
|
||||
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
|
||||
Return(peerInfo, peerKey.PubKey(), nil)
|
||||
mockConnection.On("ReceiveMessage").WaitUntil(closeCh).Return(chID, nil, io.EOF)
|
||||
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
|
||||
@@ -779,7 +777,7 @@ func TestRouter_ChannelCompatability(t *testing.T) {
|
||||
|
||||
mockConnection := &mocks.Connection{}
|
||||
mockConnection.On("String").Maybe().Return("mock")
|
||||
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
|
||||
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
|
||||
Return(incompatiblePeer, peerKey.PubKey(), nil)
|
||||
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
|
||||
mockConnection.On("Close").Return(nil)
|
||||
@@ -828,7 +826,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
|
||||
|
||||
mockConnection := &mocks.Connection{}
|
||||
mockConnection.On("String").Maybe().Return("mock")
|
||||
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
|
||||
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
|
||||
Return(peer, peerKey.PubKey(), nil)
|
||||
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
|
||||
mockConnection.On("Close").Return(nil)
|
||||
|
||||
@@ -865,11 +865,11 @@ func (sw *Switch) handshakePeer(
|
||||
c Connection,
|
||||
expectPeerID types.NodeID,
|
||||
) (types.NodeInfo, crypto.PubKey, error) {
|
||||
// Moved from transport and hardcoded until legacy P2P stack removal.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
peerInfo, peerKey, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
// Moved timeout from transport and hardcoded until legacy P2P stack removal.
|
||||
peerInfo, peerKey, err := c.Handshake(ctx, 5*time.Second, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
if err != nil {
|
||||
return peerInfo, peerKey, ErrRejected{
|
||||
conn: c.(*mConnConnection).conn,
|
||||
|
||||
@@ -267,7 +267,7 @@ func TestSwitchPeerFilter(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -324,7 +324,7 @@ func TestSwitchPeerFilterTimeout(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -360,7 +360,7 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -415,7 +415,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
peerNodeInfo, _, err := pc.conn.Handshake(context.Background(), sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
peerNodeInfo, _, err := pc.conn.Handshake(context.Background(), 0, sw.nodeInfo, sw.nodeKey.PrivKey)
|
||||
if err != nil {
|
||||
if err := conn.Close(); err != nil {
|
||||
sw.Logger.Error("Error closing connection", "err", err)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/internal/p2p/conn"
|
||||
@@ -84,7 +85,7 @@ type Connection interface {
|
||||
// FIXME: The handshake should really be the Router's responsibility, but
|
||||
// that requires the connection interface to be byte-oriented rather than
|
||||
// message-oriented (see comment above).
|
||||
Handshake(context.Context, types.NodeInfo, crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error)
|
||||
Handshake(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error)
|
||||
|
||||
// ReceiveMessage returns the next message received on the connection,
|
||||
// blocking until one is available. Returns io.EOF if closed.
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/netutil"
|
||||
|
||||
@@ -255,6 +256,7 @@ func newMConnConnection(
|
||||
// Handshake implements Connection.
|
||||
func (c *mConnConnection) Handshake(
|
||||
ctx context.Context,
|
||||
timeout time.Duration,
|
||||
nodeInfo types.NodeInfo,
|
||||
privKey crypto.PrivKey,
|
||||
) (types.NodeInfo, crypto.PubKey, error) {
|
||||
@@ -264,6 +266,12 @@ func (c *mConnConnection) Handshake(
|
||||
peerKey crypto.PubKey
|
||||
errCh = make(chan error, 1)
|
||||
)
|
||||
handshakeCtx := ctx
|
||||
if timeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
handshakeCtx, cancel = context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
}
|
||||
// To handle context cancellation, we need to do the handshake in a
|
||||
// goroutine and abort the blocking network calls by closing the connection
|
||||
// when the context is canceled.
|
||||
@@ -276,12 +284,17 @@ func (c *mConnConnection) Handshake(
|
||||
}
|
||||
}()
|
||||
var err error
|
||||
mconn, peerInfo, peerKey, err = c.handshake(ctx, nodeInfo, privKey)
|
||||
errCh <- err
|
||||
mconn, peerInfo, peerKey, err = c.handshake(handshakeCtx, nodeInfo, privKey)
|
||||
|
||||
select {
|
||||
case errCh <- err:
|
||||
case <-handshakeCtx.Done():
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-handshakeCtx.Done():
|
||||
_ = c.Close()
|
||||
return types.NodeInfo{}, nil, ctx.Err()
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
||||
@@ -270,9 +271,16 @@ func (c *MemoryConnection) Status() conn.ConnectionStatus {
|
||||
// Handshake implements Connection.
|
||||
func (c *MemoryConnection) Handshake(
|
||||
ctx context.Context,
|
||||
timeout time.Duration,
|
||||
nodeInfo types.NodeInfo,
|
||||
privKey crypto.PrivKey,
|
||||
) (types.NodeInfo, crypto.PubKey, error) {
|
||||
if timeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
select {
|
||||
case c.sendCh <- memoryMessage{nodeInfo: &nodeInfo, pubKey: privKey.PubKey()}:
|
||||
c.logger.Debug("sent handshake", "nodeInfo", nodeInfo)
|
||||
|
||||
@@ -265,7 +265,7 @@ func TestConnection_Handshake(t *testing.T) {
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
// Must use assert due to goroutine.
|
||||
peerInfo, peerKey, err := ba.Handshake(ctx, bInfo, bKey)
|
||||
peerInfo, peerKey, err := ba.Handshake(ctx, 0, bInfo, bKey)
|
||||
if err == nil {
|
||||
assert.Equal(t, aInfo, peerInfo)
|
||||
assert.Equal(t, aKey.PubKey(), peerKey)
|
||||
@@ -273,7 +273,7 @@ func TestConnection_Handshake(t *testing.T) {
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
peerInfo, peerKey, err := ab.Handshake(ctx, aInfo, aKey)
|
||||
peerInfo, peerKey, err := ab.Handshake(ctx, 0, aInfo, aKey)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bInfo, peerInfo)
|
||||
require.Equal(t, bKey.PubKey(), peerKey)
|
||||
@@ -291,7 +291,7 @@ func TestConnection_HandshakeCancel(t *testing.T) {
|
||||
ab, ba := dialAccept(t, a, b)
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
cancel()
|
||||
_, _, err := ab.Handshake(timeoutCtx, types.NodeInfo{}, ed25519.GenPrivKey())
|
||||
_, _, err := ab.Handshake(timeoutCtx, 0, types.NodeInfo{}, ed25519.GenPrivKey())
|
||||
require.Error(t, err)
|
||||
require.Equal(t, context.Canceled, err)
|
||||
_ = ab.Close()
|
||||
@@ -301,7 +301,7 @@ func TestConnection_HandshakeCancel(t *testing.T) {
|
||||
ab, ba = dialAccept(t, a, b)
|
||||
timeoutCtx, cancel = context.WithTimeout(ctx, 200*time.Millisecond)
|
||||
defer cancel()
|
||||
_, _, err = ab.Handshake(timeoutCtx, types.NodeInfo{}, ed25519.GenPrivKey())
|
||||
_, _, err = ab.Handshake(timeoutCtx, 0, types.NodeInfo{}, ed25519.GenPrivKey())
|
||||
require.Error(t, err)
|
||||
require.Equal(t, context.DeadlineExceeded, err)
|
||||
_ = ab.Close()
|
||||
@@ -630,13 +630,13 @@ func dialAcceptHandshake(t *testing.T, a, b p2p.Transport) (p2p.Connection, p2p.
|
||||
go func() {
|
||||
privKey := ed25519.GenPrivKey()
|
||||
nodeInfo := types.NodeInfo{NodeID: types.NodeIDFromPubKey(privKey.PubKey())}
|
||||
_, _, err := ba.Handshake(ctx, nodeInfo, privKey)
|
||||
_, _, err := ba.Handshake(ctx, 0, nodeInfo, privKey)
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
privKey := ed25519.GenPrivKey()
|
||||
nodeInfo := types.NodeInfo{NodeID: types.NodeIDFromPubKey(privKey.PubKey())}
|
||||
_, _, err := ab.Handshake(ctx, nodeInfo, privKey)
|
||||
_, _, err := ab.Handshake(ctx, 0, nodeInfo, privKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
timer := time.NewTimer(2 * time.Second)
|
||||
|
||||
@@ -150,3 +150,18 @@ func (_m *AppConnConsensus) InitChainSync(_a0 context.Context, _a1 types.Request
|
||||
func (_m *AppConnConsensus) SetResponseCallback(_a0 abciclient.Callback) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
|
||||
type NewAppConnConsensusT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewAppConnConsensus creates a new instance of AppConnConsensus. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewAppConnConsensus(t NewAppConnConsensusT) *AppConnConsensus {
|
||||
mock := &AppConnConsensus{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -118,3 +118,18 @@ func (_m *AppConnMempool) FlushSync(_a0 context.Context) error {
|
||||
func (_m *AppConnMempool) SetResponseCallback(_a0 abciclient.Callback) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
|
||||
type NewAppConnMempoolT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewAppConnMempool creates a new instance of AppConnMempool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewAppConnMempool(t NewAppConnMempoolT) *AppConnMempool {
|
||||
mock := &AppConnMempool{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -97,3 +97,18 @@ func (_m *AppConnQuery) QuerySync(_a0 context.Context, _a1 types.RequestQuery) (
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
type NewAppConnQueryT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewAppConnQuery creates a new instance of AppConnQuery. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewAppConnQuery(t NewAppConnQueryT) *AppConnQuery {
|
||||
mock := &AppConnQuery{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -120,3 +120,18 @@ func (_m *AppConnSnapshot) OfferSnapshotSync(_a0 context.Context, _a1 types.Requ
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
type NewAppConnSnapshotT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewAppConnSnapshot creates a new instance of AppConnSnapshot. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewAppConnSnapshot(t NewAppConnSnapshotT) *AppConnSnapshot {
|
||||
mock := &AppConnSnapshot{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -165,3 +165,18 @@ func (_m *EventSink) Type() indexer.EventSinkType {
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
type NewEventSinkT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewEventSink creates a new instance of EventSink. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewEventSink(t NewEventSinkT) *EventSink {
|
||||
mock := &EventSink{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -208,3 +208,18 @@ func (_m *BlockStore) Size() int64 {
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
type NewBlockStoreT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewBlockStore creates a new instance of BlockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewBlockStore(t NewBlockStoreT) *BlockStore {
|
||||
mock := &BlockStore{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -68,3 +68,18 @@ func (_m *EvidencePool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64
|
||||
func (_m *EvidencePool) Update(_a0 state.State, _a1 types.EvidenceList) {
|
||||
_m.Called(_a0, _a1)
|
||||
}
|
||||
|
||||
type NewEvidencePoolT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewEvidencePool creates a new instance of EvidencePool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewEvidencePool(t NewEvidencePoolT) *EvidencePool {
|
||||
mock := &EvidencePool{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -186,3 +186,18 @@ func (_m *Store) SaveValidatorSets(_a0 int64, _a1 int64, _a2 *types.ValidatorSet
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
type NewStoreT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewStore(t NewStoreT) *Store {
|
||||
mock := &Store{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -82,3 +82,18 @@ func (_m *StateProvider) State(ctx context.Context, height uint64) (state.State,
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
type NewStateProviderT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewStateProvider creates a new instance of StateProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewStateProvider(t NewStateProviderT) *StateProvider {
|
||||
mock := &StateProvider{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -1018,7 +1018,12 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
|
||||
|
||||
// process all the responses as they come in
|
||||
for i := 0; i < cap(witnessResponsesC); i++ {
|
||||
response := <-witnessResponsesC
|
||||
var response witnessResponse
|
||||
select {
|
||||
case response = <-witnessResponsesC:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
switch response.err {
|
||||
// success! We have found a new primary
|
||||
case nil:
|
||||
@@ -1047,10 +1052,6 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
|
||||
// return the light block that new primary responded with
|
||||
return response.lb, nil
|
||||
|
||||
// catch canceled contexts or deadlines
|
||||
case context.Canceled, context.DeadlineExceeded:
|
||||
return nil, response.err
|
||||
|
||||
// process benign errors by logging them only
|
||||
case provider.ErrNoResponse, provider.ErrLightBlockNotFound, provider.ErrHeightTooHigh:
|
||||
lastError = response.err
|
||||
|
||||
@@ -51,3 +51,18 @@ func (_m *Provider) ReportEvidence(_a0 context.Context, _a1 types.Evidence) erro
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
type NewProviderT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewProvider(t NewProviderT) *Provider {
|
||||
mock := &Provider{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -99,3 +99,18 @@ func (_m *LightClient) VerifyLightBlockAtHeight(ctx context.Context, height int6
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
type NewLightClientT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewLightClient creates a new instance of LightClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewLightClient(t NewLightClientT) *LightClient {
|
||||
mock := &LightClient{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -1246,7 +1246,9 @@ func createAndStartPrivValidatorGRPCClient(
|
||||
|
||||
func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
|
||||
opts := p2p.RouterOptions{
|
||||
QueueType: conf.P2P.QueueType,
|
||||
QueueType: conf.P2P.QueueType,
|
||||
HandshakeTimeout: conf.P2P.HandshakeTimeout,
|
||||
DialTimeout: conf.P2P.DialTimeout,
|
||||
}
|
||||
|
||||
if conf.P2P.MaxNumInboundPeers > 0 {
|
||||
|
||||
@@ -504,17 +504,18 @@ func createPeerManager(
|
||||
const maxUpgradeConns = 4
|
||||
|
||||
options := p2p.PeerManagerOptions{
|
||||
SelfAddress: selfAddr,
|
||||
MaxConnected: maxConns,
|
||||
MaxOutgoingConnections: maxOutgoingConns,
|
||||
MaxConnectedUpgrade: maxUpgradeConns,
|
||||
MaxPeers: maxUpgradeConns + 4*maxConns,
|
||||
MinRetryTime: 250 * time.Millisecond,
|
||||
MaxRetryTime: 30 * time.Minute,
|
||||
MaxRetryTimePersistent: 5 * time.Minute,
|
||||
RetryTimeJitter: 5 * time.Second,
|
||||
PrivatePeers: privatePeerIDs,
|
||||
Metrics: metrics,
|
||||
SelfAddress: selfAddr,
|
||||
MaxConnected: maxConns,
|
||||
MaxOutgoingConnections: maxOutgoingConns,
|
||||
MaxConnectedUpgrade: maxUpgradeConns,
|
||||
DisconnectCooldownPeriod: 2 * time.Second,
|
||||
MaxPeers: maxUpgradeConns + 4*maxConns,
|
||||
MinRetryTime: 250 * time.Millisecond,
|
||||
MaxRetryTime: 30 * time.Minute,
|
||||
MaxRetryTimePersistent: 5 * time.Minute,
|
||||
RetryTimeJitter: 5 * time.Second,
|
||||
PrivatePeers: privatePeerIDs,
|
||||
Metrics: metrics,
|
||||
}
|
||||
|
||||
peers := []p2p.NodeAddress{}
|
||||
|
||||
@@ -800,3 +800,18 @@ func (_m *Client) Validators(ctx context.Context, height *int64, page *int, perP
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
type NewClientT interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}
|
||||
|
||||
// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewClient(t NewClientT) *Client {
|
||||
mock := &Client{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
@@ -1,3 +1,15 @@
|
||||
#!/bin/sh
|
||||
#
|
||||
# Invoke Mockery v2 to update generated mocks for the given type.
|
||||
#
|
||||
# This script runs a locally-installed "mockery" if available, otherwise it
|
||||
# runs the published Docker container. This legerdemain is so that the CI build
|
||||
# and a local build can work off the same script.
|
||||
#
|
||||
if ! which mockery ; then
|
||||
mockery() {
|
||||
docker run --rm -v "$PWD":/w --workdir=/w vektra/mockery:v2.12.3
|
||||
}
|
||||
fi
|
||||
|
||||
go run github.com/vektra/mockery/v2 --disable-version-string --case underscore --name $*
|
||||
mockery --disable-version-string --case underscore --name "$@"
|
||||
|
||||
Reference in New Issue
Block a user