Compare commits

..

1 Commits

Author SHA1 Message Date
William Banfield
43746b379b p2p: remove unused error from TryDialNext 2022-06-21 12:40:14 -04:00
46 changed files with 503 additions and 747 deletions

2
.github/CODEOWNERS vendored
View File

@@ -10,4 +10,4 @@
* @ebuchman @cmwaters @tychoish @williambanfield @creachadair @sergio-mena @jmalicevic @thanethomson @ancazamfir
# Spec related changes can be approved by the protocol design team
/spec @josef-widder @milosevic @cason @sergio-mena @jmalicevic
/spec @josef-widder @milosevic @cason

View File

@@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 3
steps:
- uses: styfle/cancel-workflow-action@0.10.0
- uses: styfle/cancel-workflow-action@0.9.1
with:
workflow_id: 1041851,1401230,2837803
access_token: ${{ github.token }}

View File

@@ -15,7 +15,7 @@ jobs:
timeout-minutes: 5
steps:
- uses: actions/checkout@v3
- uses: bufbuild/buf-setup-action@v1.6.0
- uses: bufbuild/buf-setup-action@v1.5.0
- uses: bufbuild/buf-lint-action@v1
with:
input: 'proto'

View File

@@ -671,7 +671,7 @@ func DefaultP2PConfig() *P2PConfig {
ExternalAddress: "",
UPNP: false,
MaxConnections: 64,
MaxOutgoingConnections: 12,
MaxOutgoingConnections: 32,
MaxIncomingConnectionAttempts: 100,
FlushThrottleTimeout: 100 * time.Millisecond,
// The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes.

10
go.mod
View File

@@ -25,9 +25,9 @@ require (
github.com/rs/cors v1.8.2
github.com/rs/zerolog v1.27.0
github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa
github.com/spf13/cobra v1.5.0
github.com/spf13/cobra v1.4.0
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.7.2
github.com/tendermint/tm-db v0.6.6
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9
@@ -42,7 +42,7 @@ require (
github.com/creachadair/taskgroup v0.3.2
github.com/golangci/golangci-lint v1.46.0
github.com/google/go-cmp v0.5.8
github.com/vektra/mockery/v2 v2.14.0
github.com/vektra/mockery/v2 v2.13.1
gotest.tools v2.2.0+incompatible
)
@@ -74,7 +74,7 @@ require (
github.com/charithe/durationcheck v0.0.9 // indirect
github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/daixiang0/gci v0.3.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingaikin/go-header v0.4.3 // indirect
@@ -199,7 +199,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect
github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/subosito/gotenv v1.4.0 // indirect
github.com/sylvia7788/contextcheck v1.0.4 // indirect
github.com/tdakkota/asciicheck v0.1.1 // indirect

16
go.sum
View File

@@ -234,9 +234,8 @@ github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creachadair/atomicfile v0.2.6 h1:FgYxYvGcqREApTY8Nxg8msM6P/KVKK3ob5h9FaRUTNg=
github.com/creachadair/atomicfile v0.2.6/go.mod h1:BRq8Une6ckFneYXZQ+kO7p1ZZP3I2fzVzf28JxrIkBc=
github.com/creachadair/command v0.0.0-20220426235536-a748effdf6a1/go.mod h1:bAM+qFQb/KwWyCc9MLC4U1jvn3XyakqP5QRkds5T6cY=
@@ -1041,9 +1040,8 @@ github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/cobra v1.3.0/go.mod h1:BrRVncBjOJa/eUcVVm9CE+oC6as8k+VYr4NY7WCi9V4=
github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q=
github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
github.com/spf13/cobra v1.5.0 h1:X+jTBEBqF0bHN+9cSMgmfuvv2VHJ9ezmFNf9Y/XstYU=
github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
@@ -1064,9 +1062,8 @@ github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20200128134331-0f66f006fb2e/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v0.0.0-20170130113145-4d4bfba8f1d1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.1.4/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -1076,9 +1073,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs=
github.com/subosito/gotenv v1.4.0 h1:yAzM1+SmVcz5R4tXGsNMu1jUl2aOJXoiWUCEwwnGrvs=
@@ -1126,8 +1122,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
github.com/valyala/quicktemplate v1.7.0/go.mod h1:sqKJnoaOF88V07vkO+9FL8fb9uZg/VPSJnLYn+LmLk8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/vektra/mockery/v2 v2.14.0 h1:KZ1p5Hrn8tiY+LErRMr14HHle6khxo+JKOXLBW/yfqs=
github.com/vektra/mockery/v2 v2.14.0/go.mod h1:bnD1T8tExSgPD1ripLkDbr60JA9VtQeu12P3wgLZd7M=
github.com/vektra/mockery/v2 v2.13.1 h1:Lqs7aZiC7TwZO76fJ/4Zsb3NaO4F7cuuz0mZLYeNwtQ=
github.com/vektra/mockery/v2 v2.13.1/go.mod h1:bnD1T8tExSgPD1ripLkDbr60JA9VtQeu12P3wgLZd7M=
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=

View File

@@ -10,13 +10,13 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/inspect/rpc"
tmstrings "github.com/tendermint/tendermint/internal/libs/strings"
rpccore "github.com/tendermint/tendermint/internal/rpc/core"
"github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/state/indexer/sink"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
tmstrings "github.com/tendermint/tendermint/libs/strings"
"github.com/tendermint/tendermint/types"
"golang.org/x/sync/errgroup"

View File

@@ -100,8 +100,7 @@ type MConnection struct {
// used to ensure FlushStop and OnStop
// are safe to call concurrently.
stopMtx sync.Mutex
stopSignal <-chan struct{}
stopMtx sync.Mutex
cancel context.CancelFunc
@@ -208,7 +207,6 @@ func (c *MConnection) OnStart(ctx context.Context) error {
c.quitSendRoutine = make(chan struct{})
c.doneSendRoutine = make(chan struct{})
c.quitRecvRoutine = make(chan struct{})
c.stopSignal = ctx.Done()
c.setRecvLastMsgAt(time.Now())
go c.sendRoutine(ctx)
go c.recvRoutine(ctx)
@@ -683,8 +681,6 @@ func (ch *channel) sendBytes(bytes []byte) bool {
return true
case <-time.After(defaultSendTimeout):
return false
case <-ch.conn.stopSignal:
return false
}
}

View File

@@ -74,12 +74,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "peers_connected_outgoing",
Help: "Number of peers connected as a result of the peer dialing this node.",
}, labels).With(labelsAndValues...),
PeersEvicted: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peers_evicted",
Help: "Number of peers evicted by this node.",
}, labels).With(labelsAndValues...),
RouterPeerQueueRecv: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
@@ -125,7 +119,6 @@ func NopMetrics() *Metrics {
PeersConnectedFailure: discard.NewCounter(),
PeersConnectedIncoming: discard.NewGauge(),
PeersConnectedOutgoing: discard.NewGauge(),
PeersEvicted: discard.NewCounter(),
RouterPeerQueueRecv: discard.NewHistogram(),
RouterPeerQueueSend: discard.NewHistogram(),
RouterChannelQueueSend: discard.NewHistogram(),

View File

@@ -51,9 +51,6 @@ type Metrics struct {
// this node.
PeersConnectedOutgoing metrics.Gauge
// Number of peers evicted by this node.
PeersEvicted metrics.Counter
// RouterPeerQueueRecv defines the time taken to read off of a peer's queue
// before sending on the connection.
//metrics:The time taken to read off of a peer's queue before sending on the connection.

View File

@@ -13,8 +13,6 @@ import (
p2p "github.com/tendermint/tendermint/internal/p2p"
time "time"
types "github.com/tendermint/tendermint/types"
)
@@ -37,20 +35,20 @@ func (_m *Connection) Close() error {
return r0
}
// Handshake provides a mock function with given fields: _a0, _a1, _a2, _a3
func (_m *Connection) Handshake(_a0 context.Context, _a1 time.Duration, _a2 types.NodeInfo, _a3 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) {
ret := _m.Called(_a0, _a1, _a2, _a3)
// Handshake provides a mock function with given fields: _a0, _a1, _a2
func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) {
ret := _m.Called(_a0, _a1, _a2)
var r0 types.NodeInfo
if rf, ok := ret.Get(0).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) types.NodeInfo); ok {
r0 = rf(_a0, _a1, _a2, _a3)
if rf, ok := ret.Get(0).(func(context.Context, types.NodeInfo, crypto.PrivKey) types.NodeInfo); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Get(0).(types.NodeInfo)
}
var r1 crypto.PubKey
if rf, ok := ret.Get(1).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) crypto.PubKey); ok {
r1 = rf(_a0, _a1, _a2, _a3)
if rf, ok := ret.Get(1).(func(context.Context, types.NodeInfo, crypto.PrivKey) crypto.PubKey); ok {
r1 = rf(_a0, _a1, _a2)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(crypto.PubKey)
@@ -58,8 +56,8 @@ func (_m *Connection) Handshake(_a0 context.Context, _a1 time.Duration, _a2 type
}
var r2 error
if rf, ok := ret.Get(2).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) error); ok {
r2 = rf(_a0, _a1, _a2, _a3)
if rf, ok := ret.Get(2).(func(context.Context, types.NodeInfo, crypto.PrivKey) error); ok {
r2 = rf(_a0, _a1, _a2)
} else {
r2 = ret.Error(2)
}

View File

@@ -252,13 +252,12 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
require.NotNil(t, ep, "transport not listening an endpoint")
peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{
MinRetryTime: 10 * time.Millisecond,
DisconnectCooldownPeriod: 10 * time.Millisecond,
MaxRetryTime: 100 * time.Millisecond,
RetryTimeJitter: time.Millisecond,
MaxPeers: opts.MaxPeers,
MaxConnected: opts.MaxConnected,
Metrics: p2p.NopMetrics(),
MinRetryTime: 10 * time.Millisecond,
MaxRetryTime: 100 * time.Millisecond,
RetryTimeJitter: time.Millisecond,
MaxPeers: opts.MaxPeers,
MaxConnected: opts.MaxConnected,
Metrics: p2p.NopMetrics(),
})
require.NoError(t, err)
@@ -270,7 +269,7 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
func() *types.NodeInfo { return &nodeInfo },
transport,
ep,
p2p.RouterOptions{},
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},
)
require.NoError(t, err)

View File

@@ -144,10 +144,6 @@ type PeerManagerOptions struct {
// retry times, to avoid thundering herds. 0 disables jitter.
RetryTimeJitter time.Duration
// DisconnectCooldownPeriod is the amount of time after we
// disconnect from a peer before we'll consider dialing a new peer
DisconnectCooldownPeriod time.Duration
// PeerScores sets fixed scores for specific peers. It is mainly used
// for testing. A score of 0 is ignored.
PeerScores map[types.NodeID]PeerScore
@@ -221,11 +217,11 @@ func (o *PeerManagerOptions) Validate() error {
return nil
}
// isPersistent checks if a peer is in PersistentPeers. It will panic
// isPersistentPeer checks if a peer is in PersistentPeers. It will panic
// if called before optimize().
func (o *PeerManagerOptions) isPersistent(id types.NodeID) bool {
if o.persistentPeers == nil {
panic("isPersistent() called before optimize()")
panic("isPersistentPeer() called before optimize()")
}
return o.persistentPeers[id]
}
@@ -517,13 +513,12 @@ func (m *PeerManager) HasDialedMaxPeers() bool {
// returned peer.
func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) {
for {
if address := m.TryDialNext(); (address != NodeAddress{}) {
address := m.TryDialNext()
if (address != NodeAddress{}) {
return address, nil
}
select {
case <-m.dialWaker.Sleep():
continue
case <-ctx.Done():
return NodeAddress{}, ctx.Err()
}
@@ -553,19 +548,11 @@ func (m *PeerManager) TryDialNext() NodeAddress {
continue
}
if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod {
continue
}
for _, addressInfo := range peer.AddressInfo {
if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) {
continue
}
if id, ok := m.store.Resolve(addressInfo.Address); ok && (m.isConnected(id) || m.dialing[id]) {
continue
}
// We now have an eligible address to dial. If we're full but have
// upgrade capacity (as checked above), we find a lower-scored peer
// we can replace and mark it as upgrading so noone else claims it.
@@ -693,7 +680,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
return err
}
if upgradeFromPeer != "" && m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
if upgradeFromPeer != "" && m.options.MaxConnected > 0 &&
len(m.connected) >= int(m.options.MaxConnected) {
// Look for an even lower-scored peer that may have appeared since we
// started the upgrade.
if p, ok := m.store.Get(upgradeFromPeer); ok {
@@ -702,11 +690,11 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
}
}
m.evict[upgradeFromPeer] = true
m.evictWaker.Wake()
}
m.metrics.PeersConnectedOutgoing.Add(1)
m.connected[peer.ID] = peerConnectionOutgoing
m.evictWaker.Wake()
return nil
}
@@ -875,22 +863,6 @@ func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) {
delete(m.evicting, peerID)
delete(m.ready, peerID)
if peer, ok := m.store.Get(peerID); ok {
peer.LastDisconnected = time.Now()
_ = m.store.Set(peer)
// launch a thread to ping the dialWaker when the
// disconnected peer can be dialed again.
go func() {
timer := time.NewTimer(m.options.DisconnectCooldownPeriod)
defer timer.Stop()
select {
case <-timer.C:
m.dialWaker.Wake()
case <-ctx.Done():
}
}()
}
if ready {
m.broadcast(ctx, PeerUpdate{
NodeID: peerID,
@@ -1263,7 +1235,6 @@ func (m *PeerManager) retryDelay(failures uint32, persistent bool) time.Duration
type peerStore struct {
db dbm.DB
peers map[types.NodeID]*peerInfo
index map[NodeAddress]types.NodeID
ranked []*peerInfo // cache for Ranked(), nil invalidates cache
}
@@ -1283,7 +1254,6 @@ func newPeerStore(db dbm.DB) (*peerStore, error) {
// loadPeers loads all peers from the database into memory.
func (s *peerStore) loadPeers() error {
peers := map[types.NodeID]*peerInfo{}
addrs := map[NodeAddress]types.NodeID{}
start, end := keyPeerInfoRange()
iter, err := s.db.Iterator(start, end)
@@ -1303,18 +1273,11 @@ func (s *peerStore) loadPeers() error {
return fmt.Errorf("invalid peer data: %w", err)
}
peers[peer.ID] = peer
for addr := range peer.AddressInfo {
// TODO maybe check to see if we've seen this
// addr before for a different peer, there
// could be duplicates.
addrs[addr] = peer.ID
}
}
if iter.Error() != nil {
return iter.Error()
}
s.peers = peers
s.index = addrs
s.ranked = nil // invalidate cache if populated
return nil
}
@@ -1326,12 +1289,6 @@ func (s *peerStore) Get(id types.NodeID) (peerInfo, bool) {
return peer.Copy(), ok
}
// Resolve returns the peer ID for a given node address if known.
func (s *peerStore) Resolve(addr NodeAddress) (types.NodeID, bool) {
id, ok := s.index[addr]
return id, ok
}
// Set stores peer data. The input data will be copied, and can safely be reused
// by the caller.
func (s *peerStore) Set(peer peerInfo) error {
@@ -1360,29 +1317,20 @@ func (s *peerStore) Set(peer peerInfo) error {
// update the existing pointer address.
*current = peer
}
for addr := range peer.AddressInfo {
s.index[addr] = peer.ID
}
return nil
}
// Delete deletes a peer, or does nothing if it does not exist.
func (s *peerStore) Delete(id types.NodeID) error {
peer, ok := s.peers[id]
if !ok {
if _, ok := s.peers[id]; !ok {
return nil
}
for _, addr := range peer.AddressInfo {
delete(s.index, addr.Address)
}
delete(s.peers, id)
s.ranked = nil
if err := s.db.Delete(keyPeerInfo(id)); err != nil {
return err
}
delete(s.peers, id)
s.ranked = nil
return nil
}
@@ -1471,10 +1419,9 @@ func (s *peerStore) Size() int {
// peerInfo contains peer information stored in a peerStore.
type peerInfo struct {
ID types.NodeID
AddressInfo map[NodeAddress]*peerAddressInfo
LastConnected time.Time
LastDisconnected time.Time
ID types.NodeID
AddressInfo map[NodeAddress]*peerAddressInfo
LastConnected time.Time
// These fields are ephemeral, i.e. not persisted to the database.
Persistent bool
@@ -1514,8 +1461,8 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) {
func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
msg := &p2pproto.PeerInfo{
ID: string(p.ID),
Inactive: p.Inactive,
LastConnected: &p.LastConnected,
Inactive: p.Inactive,
}
for _, addressInfo := range p.AddressInfo {
msg.AddressInfo = append(msg.AddressInfo, addressInfo.ToProto())
@@ -1523,7 +1470,6 @@ func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
if msg.LastConnected.IsZero() {
msg.LastConnected = nil
}
return msg
}

View File

@@ -384,14 +384,16 @@ func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Add b. We shouldn't be able to dial it, due to MaxConnected.
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// Spawn a goroutine to fail a's dial attempt.
@@ -425,7 +427,8 @@ func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.DialFailed(ctx, dial))
failed := time.Now()
@@ -455,7 +458,8 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) {
err = peerManager.Accepted(a.NodeID)
require.NoError(t, err)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
dctx, dcancel := context.WithTimeout(ctx, 300*time.Millisecond)
@@ -486,7 +490,8 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -494,14 +499,16 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// At this point, adding c will not allow dialing it.
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
}
@@ -533,7 +540,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -542,7 +549,8 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// Even though we are at capacity, we should be allowed to dial c for an
@@ -550,7 +558,8 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
// However, since we're using all upgrade slots now, we can't add and dial
@@ -558,14 +567,16 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// We go through with c's upgrade.
require.NoError(t, peerManager.Dialed(c))
// Still can't dial d.
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// Now, if we disconnect a, we should be allowed to dial d because we have a
@@ -581,7 +592,8 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(e)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
}
@@ -601,7 +613,8 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -609,7 +622,8 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// Adding c and dialing it will fail, because a is the only connected
@@ -617,7 +631,8 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
}
@@ -638,19 +653,22 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Adding a's TCP address will not dispense a, since it's already dialing.
added, err = peerManager.Add(aTCP)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// Marking a as dialed will still not dispense it.
require.NoError(t, peerManager.Dialed(a))
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// Adding b and accepting a connection from it will not dispense it either.
@@ -658,7 +676,8 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(bID))
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
}
@@ -687,14 +706,16 @@ func TestPeerManager_TryDialNext_Multiple(t *testing.T) {
// All addresses should be dispensed as long as dialing them has failed.
dial := []p2p.NodeAddress{}
for range addresses {
address := peerManager.TryDialNext()
address, err := peerManager.TryDialNext()
require.NoError(t, err)
require.NotZero(t, address)
require.NoError(t, peerManager.DialFailed(ctx, address))
dial = append(dial, address)
}
require.ElementsMatch(t, dial, addresses)
address := peerManager.TryDialNext()
address, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, address)
}
@@ -719,14 +740,15 @@ func TestPeerManager_DialFailed(t *testing.T) {
// Dialing and then calling DialFailed with a different address (same
// NodeID) should unmark as dialing and allow us to dial the other address
// again, but not register the failed address.
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.DialFailed(ctx, p2p.NodeAddress{
Protocol: "tcp", NodeID: aID, Hostname: "localhost"}))
require.Equal(t, []p2p.NodeAddress{a}, peerManager.Addresses(aID))
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Calling DialFailed on same address twice should be fine.
@@ -760,7 +782,8 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -768,7 +791,8 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// Adding c and dialing it will fail, even though it could upgrade a and we
@@ -777,12 +801,14 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
// Failing b's dial will now make c available for dialing.
require.NoError(t, peerManager.DialFailed(ctx, b))
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
}
@@ -797,7 +823,8 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -807,7 +834,8 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
require.NoError(t, peerManager.Accepted(b.NodeID))
@@ -836,7 +864,8 @@ func TestPeerManager_Dialed_MaxConnected(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Marking b as dialed in the meanwhile (even without TryDialNext)
@@ -878,7 +907,8 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
require.NoError(t, peerManager.Dialed(c))
@@ -922,7 +952,8 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
require.NoError(t, peerManager.Dialed(b))
@@ -931,7 +962,8 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
// a should now be evicted.
@@ -977,7 +1009,8 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
// In the meanwhile, a disconnects and d connects. d is even lower-scored
@@ -1030,7 +1063,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
@@ -1076,7 +1109,8 @@ func TestPeerManager_Accepted(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
require.NoError(t, peerManager.Accepted(c.NodeID))
require.Error(t, peerManager.Dialed(c))
@@ -1085,7 +1119,8 @@ func TestPeerManager_Accepted(t *testing.T) {
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, d, dial)
require.NoError(t, peerManager.Dialed(d))
require.Error(t, peerManager.Accepted(d.NodeID))
@@ -1236,7 +1271,8 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// a has already been claimed as an upgrade of a, so accepting
@@ -1410,7 +1446,8 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
added, err := peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
require.NoError(t, peerManager.Dialed(b))
}()
@@ -1544,11 +1581,13 @@ func TestPeerManager_Disconnected(t *testing.T) {
// Disconnecting a dialing peer does not unmark it as dialing, to avoid
// dialing it multiple times in parallel.
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
peerManager.Disconnected(ctx, a.NodeID)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
}
@@ -1621,7 +1660,8 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
// Outbound connection with peer error and eviction.
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.Empty(t, sub.Updates())
@@ -1644,7 +1684,8 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
// Outbound connection with dial failure.
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.Empty(t, sub.Updates())
@@ -1749,7 +1790,8 @@ func TestPeerManager_Close(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.DialFailed(ctx, a))
}

View File

@@ -151,9 +151,7 @@ func (s *pqScheduler) closed() <-chan struct{} { return s.done }
// non-empty, we pop the top Envelope and send it on the dequeueCh.
func (s *pqScheduler) process(ctx context.Context) {
defer close(s.done)
dequeueReady := make(chan struct{}, 1)
LOOP:
for {
select {
case e := <-s.enqueueCh:
@@ -241,36 +239,28 @@ LOOP:
)
}
}
select {
case dequeueReady <- struct{}{}:
default:
}
case <-dequeueReady:
// dequeue
for s.pq.Len() > 0 {
pqEnv := heap.Pop(s.pq).(*pqEnvelope)
pqEnv = heap.Pop(s.pq).(*pqEnvelope)
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To),
"message_type", s.lc.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
s.metrics.PeerPendingSendBytes.With(
"peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size))
select {
case s.dequeueCh <- pqEnv.envelope:
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
chIDStr := strconv.Itoa(int(pqEnv.envelope.ChannelID))
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To),
"message_type", s.lc.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
s.metrics.PeerPendingSendBytes.With(
"peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size))
default:
heap.Push(s.pq, pqEnv)
select {
case dequeueReady <- struct{}{}:
default:
}
continue LOOP
case <-s.closeCh:
return
}
}
case <-ctx.Done():

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net"
"runtime"
"sync"
@@ -61,6 +62,12 @@ type RouterOptions struct {
// return an error to reject the peer.
FilterPeerByID func(context.Context, types.NodeID) error
// DialSleep controls the amount of time that the router
// sleeps between dialing peers. If not set, a default value
// is used that sleeps for a (random) amount of time up to 3
// seconds between submitting each peer to be dialed.
DialSleep func(context.Context)
// NumConcrruentDials controls how many parallel go routines
// are used to dial peers. This defaults to the value of
// runtime.NumCPU.
@@ -303,7 +310,11 @@ func (r *Router) routeChannel(
) {
for {
select {
case envelope := <-outCh:
case envelope, ok := <-outCh:
if !ok {
return
}
// Mark the envelope with the channel ID to allow sendPeer() to pass
// it on to Transport.SendMessage().
envelope.ChannelID = chID
@@ -380,22 +391,20 @@ func (r *Router) routeChannel(
}
}
case peerError := <-errCh:
maxPeerCapacity := r.peerManager.HasMaxPeerCapacity()
case peerError, ok := <-errCh:
if !ok {
return
}
shouldEvict := peerError.Fatal || r.peerManager.HasMaxPeerCapacity()
r.logger.Error("peer error",
"peer", peerError.NodeID,
"err", peerError.Err,
"disconnecting", peerError.Fatal || maxPeerCapacity,
"evicting", shouldEvict,
)
if peerError.Fatal || maxPeerCapacity {
// if the error is fatal or all peer
// slots are in use, we can error
// (disconnect) from the peer.
if shouldEvict {
r.peerManager.Errored(peerError.NodeID, peerError.Err)
} else {
// this just decrements the peer
// score.
r.peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: peerError.NodeID,
Status: PeerStatusBad,
@@ -410,7 +419,7 @@ func (r *Router) routeChannel(
func (r *Router) numConcurrentDials() int {
if r.options.NumConcurrentDials == nil {
return runtime.NumCPU() * 32
return runtime.NumCPU()
}
return r.options.NumConcurrentDials()
@@ -432,6 +441,38 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error {
return r.options.FilterPeerByID(ctx, id)
}
func (r *Router) dialSleep(ctx context.Context) {
if r.options.DialSleep == nil {
// the connTracker (on the other side) only rate
// limits peers for dialing more than once every 10ms,
// so these numbers are safe.
const (
maxDialerInterval = 500 // ms
minDialerInterval = 100 // ms
)
// nolint:gosec // G404: Use of weak random number generator
dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval)
timer := time.NewTimer(dur * time.Millisecond)
defer timer.Stop()
select {
case <-ctx.Done():
case <-timer.C:
}
return
}
r.options.DialSleep(ctx)
if !r.peerManager.HasDialedMaxPeers() {
r.peerManager.dialWaker.Wake()
}
}
// acceptPeers accepts inbound connections from peers on the given transport,
// and spawns goroutines that route messages to/from them.
func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
@@ -550,13 +591,19 @@ LOOP:
switch {
case errors.Is(err, context.Canceled):
break LOOP
case address == NodeAddress{}:
continue LOOP
case err != nil:
r.logger.Error("failed to find next peer to dial", "err", err)
break LOOP
}
select {
case addresses <- address:
continue LOOP
// this jitters the frequency that we call
// DialNext and prevents us from attempting to
// create connections too quickly.
r.dialSleep(ctx)
continue
case <-ctx.Done():
close(addresses)
break LOOP
@@ -595,7 +642,6 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err)
r.peerManager.dialWaker.Wake()
conn.Close()
return
}
@@ -671,8 +717,14 @@ func (r *Router) handshakePeer(
expectID types.NodeID,
) (types.NodeInfo, error) {
if r.options.HandshakeTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.options.HandshakeTimeout)
defer cancel()
}
nodeInfo := r.nodeInfoProducer()
peerInfo, peerKey, err := conn.Handshake(ctx, r.options.HandshakeTimeout, *nodeInfo, r.privKey)
peerInfo, peerKey, err := conn.Handshake(ctx, *nodeInfo, r.privKey)
if err != nil {
return peerInfo, err
}
@@ -885,8 +937,6 @@ func (r *Router) evictPeers(ctx context.Context) {
queue, ok := r.peerQueues[peerID]
r.peerMtx.RUnlock()
r.metrics.PeersEvicted.Add(1)
if ok {
queue.close()
}

View File

@@ -385,7 +385,7 @@ func TestRouter_AcceptPeers(t *testing.T) {
connCtx, connCancel := context.WithCancel(context.Background())
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
Return(tc.peerInfo, tc.peerKey, nil)
mockConnection.On("Close").Run(func(_ mock.Arguments) { connCancel() }).Return(nil).Maybe()
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
@@ -500,7 +500,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF)
mockConnection.On("Close").Return(nil)
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
@@ -588,7 +588,7 @@ func TestRouter_DialPeers(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
if tc.dialErr == nil {
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
Return(tc.peerInfo, tc.peerKey, nil)
mockConnection.On("Close").Run(func(_ mock.Arguments) { connCancel() }).Return(nil).Maybe()
}
@@ -674,7 +674,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF)
mockConnection.On("Close").Return(nil)
@@ -715,6 +715,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
mockTransport,
nil,
p2p.RouterOptions{
DialSleep: func(_ context.Context) {},
NumConcurrentDials: func() int {
ncpu := runtime.NumCPU()
if ncpu <= 3 {
@@ -756,7 +757,7 @@ func TestRouter_EvictPeers(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
Return(peerInfo, peerKey.PubKey(), nil)
mockConnection.On("ReceiveMessage", mock.Anything).WaitUntil(closeCh).Return(chID, nil, io.EOF)
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
@@ -825,7 +826,7 @@ func TestRouter_ChannelCompatability(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
Return(incompatiblePeer, peerKey.PubKey(), nil)
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
mockConnection.On("Close").Return(nil)
@@ -876,7 +877,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey).
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
Return(peer, peerKey.PubKey(), nil)
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
mockConnection.On("Close").Return(nil)

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"net"
"time"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/types"
@@ -82,7 +81,7 @@ type Connection interface {
// FIXME: The handshake should really be the Router's responsibility, but
// that requires the connection interface to be byte-oriented rather than
// message-oriented (see comment above).
Handshake(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error)
Handshake(context.Context, types.NodeInfo, crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error)
// ReceiveMessage returns the next message received on the connection,
// blocking until one is available. Returns io.EOF if closed.

View File

@@ -9,7 +9,6 @@ import (
"net"
"strconv"
"sync"
"time"
"golang.org/x/net/netutil"
@@ -275,7 +274,6 @@ func newMConnConnection(
// Handshake implements Connection.
func (c *mConnConnection) Handshake(
ctx context.Context,
timeout time.Duration,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
) (types.NodeInfo, crypto.PubKey, error) {
@@ -285,12 +283,6 @@ func (c *mConnConnection) Handshake(
peerKey crypto.PubKey
errCh = make(chan error, 1)
)
handshakeCtx := ctx
if timeout > 0 {
var cancel context.CancelFunc
handshakeCtx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// To handle context cancellation, we need to do the handshake in a
// goroutine and abort the blocking network calls by closing the connection
// when the context is canceled.
@@ -303,29 +295,25 @@ func (c *mConnConnection) Handshake(
}
}()
var err error
mconn, peerInfo, peerKey, err = c.handshake(handshakeCtx, nodeInfo, privKey)
mconn, peerInfo, peerKey, err = c.handshake(ctx, nodeInfo, privKey)
select {
case errCh <- err:
case <-handshakeCtx.Done():
case <-ctx.Done():
}
}()
select {
case <-handshakeCtx.Done():
case <-ctx.Done():
_ = c.Close()
return types.NodeInfo{}, nil, handshakeCtx.Err()
return types.NodeInfo{}, nil, ctx.Err()
case err := <-errCh:
if err != nil {
return types.NodeInfo{}, nil, err
}
c.mconn = mconn
// Start must not use the handshakeCtx. The handshakeCtx may have a
// timeout set that is intended to terminate only the handshake procedure.
// The context passed to Start controls the entire lifecycle of the
// mconn.
if err = c.mconn.Start(ctx); err != nil {
return types.NodeInfo{}, nil, err
}

View File

@@ -7,7 +7,6 @@ import (
"io"
"net"
"sync"
"time"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/log"
@@ -274,16 +273,9 @@ func (c *MemoryConnection) RemoteEndpoint() Endpoint {
// Handshake implements Connection.
func (c *MemoryConnection) Handshake(
ctx context.Context,
timeout time.Duration,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
) (types.NodeInfo, crypto.PubKey, error) {
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
select {
case c.sendCh <- memoryMessage{nodeInfo: &nodeInfo, pubKey: privKey.PubKey()}:
c.logger.Debug("sent handshake", "nodeInfo", nodeInfo)

View File

@@ -296,7 +296,7 @@ func TestConnection_Handshake(t *testing.T) {
errCh := make(chan error, 1)
go func() {
// Must use assert due to goroutine.
peerInfo, peerKey, err := ba.Handshake(ctx, 0, bInfo, bKey)
peerInfo, peerKey, err := ba.Handshake(ctx, bInfo, bKey)
if err == nil {
assert.Equal(t, aInfo, peerInfo)
assert.Equal(t, aKey.PubKey(), peerKey)
@@ -307,7 +307,7 @@ func TestConnection_Handshake(t *testing.T) {
}
}()
peerInfo, peerKey, err := ab.Handshake(ctx, 0, aInfo, aKey)
peerInfo, peerKey, err := ab.Handshake(ctx, aInfo, aKey)
require.NoError(t, err)
require.Equal(t, bInfo, peerInfo)
require.Equal(t, bKey.PubKey(), peerKey)
@@ -328,7 +328,7 @@ func TestConnection_HandshakeCancel(t *testing.T) {
ab, ba := dialAccept(ctx, t, a, b)
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
cancel()
_, _, err := ab.Handshake(timeoutCtx, 0, types.NodeInfo{}, ed25519.GenPrivKey())
_, _, err := ab.Handshake(timeoutCtx, types.NodeInfo{}, ed25519.GenPrivKey())
require.Error(t, err)
require.Equal(t, context.Canceled, err)
_ = ab.Close()
@@ -338,7 +338,7 @@ func TestConnection_HandshakeCancel(t *testing.T) {
ab, ba = dialAccept(ctx, t, a, b)
timeoutCtx, cancel = context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel()
_, _, err = ab.Handshake(timeoutCtx, 0, types.NodeInfo{}, ed25519.GenPrivKey())
_, _, err = ab.Handshake(timeoutCtx, types.NodeInfo{}, ed25519.GenPrivKey())
require.Error(t, err)
require.Equal(t, context.DeadlineExceeded, err)
_ = ab.Close()
@@ -642,13 +642,13 @@ func dialAcceptHandshake(ctx context.Context, t *testing.T, a, b p2p.Transport)
go func() {
privKey := ed25519.GenPrivKey()
nodeInfo := types.NodeInfo{NodeID: types.NodeIDFromPubKey(privKey.PubKey())}
_, _, err := ba.Handshake(ctx, 0, nodeInfo, privKey)
_, _, err := ba.Handshake(ctx, nodeInfo, privKey)
errCh <- err
}()
privKey := ed25519.GenPrivKey()
nodeInfo := types.NodeInfo{NodeID: types.NodeIDFromPubKey(privKey.PubKey())}
_, _, err := ab.Handshake(ctx, 0, nodeInfo, privKey)
_, _, err := ab.Handshake(ctx, nodeInfo, privKey)
require.NoError(t, err)
timer := time.NewTimer(2 * time.Second)

View File

@@ -18,7 +18,6 @@ import (
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/libs/strings"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
@@ -27,6 +26,7 @@ import (
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/statesync"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/strings"
"github.com/tendermint/tendermint/rpc/coretypes"
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
"github.com/tendermint/tendermint/types"

View File

@@ -61,20 +61,20 @@ func NewDefaultLogger(format, level string) (Logger, error) {
}
func (l defaultLogger) Info(msg string, keyVals ...interface{}) {
l.Logger.Info().Fields(keyVals).Msg(msg)
l.Logger.Info().Fields(getLogFields(keyVals...)).Msg(msg)
}
func (l defaultLogger) Error(msg string, keyVals ...interface{}) {
l.Logger.Error().Fields(keyVals).Msg(msg)
l.Logger.Error().Fields(getLogFields(keyVals...)).Msg(msg)
}
func (l defaultLogger) Debug(msg string, keyVals ...interface{}) {
l.Logger.Debug().Fields(keyVals).Msg(msg)
l.Logger.Debug().Fields(getLogFields(keyVals...)).Msg(msg)
}
func (l defaultLogger) With(keyVals ...interface{}) Logger {
return &defaultLogger{
Logger: l.Logger.With().Fields(keyVals).Logger(),
Logger: l.Logger.With().Fields(getLogFields(keyVals...)).Logger(),
}
}
@@ -99,3 +99,16 @@ func OverrideWithNewLogger(logger Logger, format, level string) error {
ol.Logger = nl.Logger
return nil
}
func getLogFields(keyVals ...interface{}) map[string]interface{} {
if len(keyVals)%2 != 0 {
return nil
}
fields := make(map[string]interface{}, len(keyVals))
for i := 0; i < len(keyVals); i += 2 {
fields[fmt.Sprint(keyVals[i])] = keyVals[i+1]
}
return fields
}

View File

@@ -1034,12 +1034,7 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
// process all the responses as they come in
for i := 0; i < cap(witnessResponsesC); i++ {
var response witnessResponse
select {
case response = <-witnessResponsesC:
case <-ctx.Done():
return nil, ctx.Err()
}
response := <-witnessResponsesC
switch response.err {
// success! We have found a new primary
case nil:
@@ -1068,6 +1063,10 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
// return the light block that new primary responded with
return response.lb, nil
// catch canceled contexts or deadlines
case context.Canceled, context.DeadlineExceeded:
return nil, response.err
// process benign errors by logging them only
case provider.ErrNoResponse, provider.ErrLightBlockNotFound, provider.ErrHeightTooHigh:
lastError = response.err

View File

@@ -715,9 +715,7 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene
func getRouterConfig(conf *config.Config, appClient abciclient.Client) p2p.RouterOptions {
opts := p2p.RouterOptions{
QueueType: conf.P2P.QueueType,
HandshakeTimeout: conf.P2P.HandshakeTimeout,
DialTimeout: conf.P2P.DialTimeout,
QueueType: conf.P2P.QueueType,
}
if conf.FilterPeers && appClient != nil {

View File

@@ -17,7 +17,6 @@ import (
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/evidence"
tmstrings "github.com/tendermint/tendermint/internal/libs/strings"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
@@ -29,6 +28,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmnet "github.com/tendermint/tendermint/libs/net"
"github.com/tendermint/tendermint/libs/service"
tmstrings "github.com/tendermint/tendermint/libs/strings"
"github.com/tendermint/tendermint/privval"
tmgrpc "github.com/tendermint/tendermint/privval/grpc"
"github.com/tendermint/tendermint/types"
@@ -235,18 +235,17 @@ func createPeerManager(
maxUpgradeConns := uint16(4)
options := p2p.PeerManagerOptions{
SelfAddress: selfAddr,
MaxConnected: maxConns,
MaxOutgoingConnections: maxOutgoingConns,
MaxConnectedUpgrade: maxUpgradeConns,
DisconnectCooldownPeriod: 2 * time.Second,
MaxPeers: maxUpgradeConns + 4*maxConns,
MinRetryTime: 250 * time.Millisecond,
MaxRetryTime: 30 * time.Minute,
MaxRetryTimePersistent: 5 * time.Minute,
RetryTimeJitter: 5 * time.Second,
PrivatePeers: privatePeerIDs,
Metrics: metrics,
SelfAddress: selfAddr,
MaxConnected: maxConns,
MaxOutgoingConnections: maxOutgoingConns,
MaxConnectedUpgrade: maxUpgradeConns,
MaxPeers: maxUpgradeConns + 4*maxConns,
MinRetryTime: 250 * time.Millisecond,
MaxRetryTime: 30 * time.Minute,
MaxRetryTimePersistent: 5 * time.Minute,
RetryTimeJitter: 5 * time.Second,
PrivatePeers: privatePeerIDs,
Metrics: metrics,
}
peers := []p2p.NodeAddress{}

View File

@@ -1,3 +1,4 @@
//nolint: gosec
package main
import (
@@ -76,8 +77,6 @@ func (cli *CLI) generate() error {
return err
}
// nolint: gosec
// G404: Use of weak random number generator (math/rand instead of crypto/rand)
manifests, err := Generate(rand.New(rand.NewSource(randomSeed)), cli.opts)
if err != nil {
return err

View File

@@ -1,34 +0,0 @@
package exec
import (
"context"
"fmt"
"os"
osexec "os/exec"
)
// Command executes a shell command.
func Command(ctx context.Context, args ...string) error {
// nolint: gosec
// G204: Subprocess launched with a potential tainted input or cmd arguments
cmd := osexec.CommandContext(ctx, args[0], args[1:]...)
out, err := cmd.CombinedOutput()
switch err := err.(type) {
case nil:
return nil
case *osexec.ExitError:
return fmt.Errorf("failed to run %q:\n%v", args, string(out))
default:
return err
}
}
// CommandVerbose executes a shell command while displaying its output.
func CommandVerbose(ctx context.Context, args ...string) error {
// nolint: gosec
// G204: Subprocess launched with a potential tainted input or cmd arguments
cmd := osexec.CommandContext(ctx, args[0], args[1:]...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}

View File

@@ -1,69 +0,0 @@
package docker
import (
"bytes"
"text/template"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
// makeDockerCompose generates a Docker Compose config for a testnet.
func makeDockerCompose(testnet *e2e.Testnet) ([]byte, error) {
// Must use version 2 Docker Compose format, to support IPv6.
tmpl, err := template.New("docker-compose").Funcs(template.FuncMap{
"addUint32": func(x, y uint32) uint32 {
return x + y
},
"isBuiltin": func(protocol e2e.Protocol, mode e2e.Mode) bool {
return mode == e2e.ModeLight || protocol == e2e.ProtocolBuiltin
},
}).Parse(`version: '2.4'
networks:
{{ .Name }}:
labels:
e2e: true
driver: bridge
{{- if .IPv6 }}
enable_ipv6: true
{{- end }}
ipam:
driver: default
config:
- subnet: {{ .IP }}
services:
{{- range .Nodes }}
{{ .Name }}:
labels:
e2e: true
container_name: {{ .Name }}
image: tendermint/e2e-node
{{- if isBuiltin $.ABCIProtocol .Mode }}
entrypoint: /usr/bin/entrypoint-builtin
{{- else if .LogLevel }}
command: start --log-level {{ .LogLevel }}
{{- end }}
init: true
ports:
- 26656
- {{ if .ProxyPort }}{{ addUint32 .ProxyPort 1000 }}:{{ end }}26660
- {{ if .ProxyPort }}{{ .ProxyPort }}:{{ end }}26657
- 6060
volumes:
- ./{{ .Name }}:/tendermint
networks:
{{ $.Name }}:
ipv{{ if $.IPv6 }}6{{ else }}4{{ end}}_address: {{ .IP }}
{{end}}`)
if err != nil {
return nil, err
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, testnet)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

View File

@@ -1,27 +0,0 @@
package docker
import (
"context"
"path/filepath"
"github.com/tendermint/tendermint/test/e2e/pkg/exec"
)
// execCompose runs a Docker Compose command for a testnet.
func execCompose(ctx context.Context, dir string, args ...string) error {
return exec.Command(ctx, append(
[]string{"docker-compose", "--ansi=never", "-f", filepath.Join(dir, "docker-compose.yml")},
args...)...)
}
// execComposeVerbose runs a Docker Compose command for a testnet and displays its output.
func execComposeVerbose(ctx context.Context, dir string, args ...string) error {
return exec.CommandVerbose(ctx, append(
[]string{"docker-compose", "--ansi=never", "-f", filepath.Join(dir, "docker-compose.yml")},
args...)...)
}
// execDocker runs a Docker command.
func execDocker(ctx context.Context, args ...string) error {
return exec.Command(ctx, append([]string{"docker"}, args...)...)
}

View File

@@ -1,140 +0,0 @@
package docker
import (
"context"
"fmt"
"os"
"path/filepath"
"github.com/tendermint/tendermint/libs/log"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/e2e/pkg/exec"
"github.com/tendermint/tendermint/test/e2e/pkg/infra"
)
// testnetInfra provides an API for provisioning and manipulating
// infrastructure for a Docker-based testnet.
type testnetInfra struct {
logger log.Logger
testnet *e2e.Testnet
}
var _ infra.TestnetInfra = &testnetInfra{}
// NewTestnetInfra constructs an infrastructure provider that allows for Docker-based
// testnet infrastructure.
func NewTestnetInfra(logger log.Logger, testnet *e2e.Testnet) infra.TestnetInfra {
return &testnetInfra{
logger: logger,
testnet: testnet,
}
}
func (ti *testnetInfra) Setup(ctx context.Context) error {
compose, err := makeDockerCompose(ti.testnet)
if err != nil {
return err
}
// nolint: gosec
// G306: Expect WriteFile permissions to be 0600 or less
err = os.WriteFile(filepath.Join(ti.testnet.Dir, "docker-compose.yml"), compose, 0644)
if err != nil {
return err
}
return nil
}
func (ti *testnetInfra) StartNode(ctx context.Context, node *e2e.Node) error {
return execCompose(ctx, ti.testnet.Dir, "up", "-d", node.Name)
}
func (ti *testnetInfra) DisconnectNode(ctx context.Context, node *e2e.Node) error {
return execDocker(ctx, "network", "disconnect", ti.testnet.Name+"_"+ti.testnet.Name, node.Name)
}
func (ti *testnetInfra) ConnectNode(ctx context.Context, node *e2e.Node) error {
return execDocker(ctx, "network", "connect", ti.testnet.Name+"_"+ti.testnet.Name, node.Name)
}
func (ti *testnetInfra) KillNodeProcess(ctx context.Context, node *e2e.Node) error {
return execCompose(ctx, ti.testnet.Dir, "kill", "-s", "SIGKILL", node.Name)
}
func (ti *testnetInfra) StartNodeProcess(ctx context.Context, node *e2e.Node) error {
return execCompose(ctx, ti.testnet.Dir, "start", node.Name)
}
func (ti *testnetInfra) PauseNodeProcess(ctx context.Context, node *e2e.Node) error {
return execCompose(ctx, ti.testnet.Dir, "pause", node.Name)
}
func (ti *testnetInfra) UnpauseNodeProcess(ctx context.Context, node *e2e.Node) error {
return execCompose(ctx, ti.testnet.Dir, "unpause", node.Name)
}
func (ti *testnetInfra) TerminateNodeProcess(ctx context.Context, node *e2e.Node) error {
return execCompose(ctx, ti.testnet.Dir, "kill", "-s", "SIGTERM", node.Name)
}
func (ti *testnetInfra) Stop(ctx context.Context) error {
return execCompose(ctx, ti.testnet.Dir, "down")
}
func (ti *testnetInfra) Pause(ctx context.Context) error {
return execCompose(ctx, ti.testnet.Dir, "pause")
}
func (ti *testnetInfra) Unpause(ctx context.Context) error {
return execCompose(ctx, ti.testnet.Dir, "unpause")
}
func (ti *testnetInfra) ShowLogs(ctx context.Context) error {
return execComposeVerbose(ctx, ti.testnet.Dir, "logs", "--no-color")
}
func (ti *testnetInfra) ShowNodeLogs(ctx context.Context, node *e2e.Node) error {
return execComposeVerbose(ctx, ti.testnet.Dir, "logs", "--no-color", node.Name)
}
func (ti *testnetInfra) TailLogs(ctx context.Context) error {
return execComposeVerbose(ctx, ti.testnet.Dir, "logs", "--follow")
}
func (ti *testnetInfra) TailNodeLogs(ctx context.Context, node *e2e.Node) error {
return execComposeVerbose(ctx, ti.testnet.Dir, "logs", "--follow", node.Name)
}
func (ti *testnetInfra) Cleanup(ctx context.Context) error {
ti.logger.Info("Removing Docker containers and networks")
// GNU xargs requires the -r flag to not run when input is empty, macOS
// does this by default. Ugly, but works.
xargsR := `$(if [[ $OSTYPE == "linux-gnu"* ]]; then echo -n "-r"; fi)`
err := exec.Command(ctx, "bash", "-c", fmt.Sprintf(
"docker container ls -qa --filter label=e2e | xargs %v docker container rm -f", xargsR))
if err != nil {
return err
}
err = exec.Command(ctx, "bash", "-c", fmt.Sprintf(
"docker network ls -q --filter label=e2e | xargs %v docker network rm", xargsR))
if err != nil {
return err
}
// On Linux, some local files in the volume will be owned by root since Tendermint
// runs as root inside the container, so we need to clean them up from within a
// container running as root too.
absDir, err := filepath.Abs(ti.testnet.Dir)
if err != nil {
return err
}
err = execDocker(ctx, "run", "--rm", "--entrypoint", "", "-v", fmt.Sprintf("%v:/network", absDir),
"tendermint/e2e-node", "sh", "-c", "rm -rf /network/*/")
if err != nil {
return err
}
return nil
}

View File

@@ -1,84 +0,0 @@
package infra
import (
"context"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
// TestnetInfra provides an API for manipulating the infrastructure of a
// specific testnet.
type TestnetInfra interface {
//
// Overarching testnet infrastructure management.
//
// Setup generates any necessary configuration for the infrastructure
// provider during testnet setup.
Setup(ctx context.Context) error
// Stop will stop all running processes throughout the testnet without
// destroying any infrastructure.
Stop(ctx context.Context) error
// Pause will pause all processes in the testnet.
Pause(ctx context.Context) error
// Unpause will resume a paused testnet.
Unpause(ctx context.Context) error
// ShowLogs prints all logs for the whole testnet to stdout.
ShowLogs(ctx context.Context) error
// TailLogs tails the logs for all nodes in the testnet, if this is
// supported by the infrastructure provider.
TailLogs(ctx context.Context) error
// Cleanup stops and destroys all running testnet infrastructure and
// deletes any generated files.
Cleanup(ctx context.Context) error
//
// Node management, including node infrastructure.
//
// StartNode provisions infrastructure for the given node and starts it.
StartNode(ctx context.Context, node *e2e.Node) error
// DisconnectNode modifies the specified node's network configuration such
// that it becomes bidirectionally disconnected from the network (it cannot
// see other nodes, and other nodes cannot see it).
DisconnectNode(ctx context.Context, node *e2e.Node) error
// ConnectNode modifies the specified node's network configuration such
// that it can become bidirectionally connected.
ConnectNode(ctx context.Context, node *e2e.Node) error
// ShowNodeLogs prints all logs for the node with the give ID to stdout.
ShowNodeLogs(ctx context.Context, node *e2e.Node) error
// TailNodeLogs tails the logs for a single node, if this is supported by
// the infrastructure provider.
TailNodeLogs(ctx context.Context, node *e2e.Node) error
//
// Node process management.
//
// KillNodeProcess sends SIGKILL to a node's process.
KillNodeProcess(ctx context.Context, node *e2e.Node) error
// StartNodeProcess will start a stopped node's process. Assumes that the
// node's infrastructure has previously been provisioned using
// ProvisionNode.
StartNodeProcess(ctx context.Context, node *e2e.Node) error
// PauseNodeProcess sends a signal to the node's process to pause it.
PauseNodeProcess(ctx context.Context, node *e2e.Node) error
// UnpauseNodeProcess resumes a paused node's process.
UnpauseNodeProcess(ctx context.Context, node *e2e.Node) error
// TerminateNodeProcess sends SIGTERM to a node's process.
TerminateNodeProcess(ctx context.Context, node *e2e.Node) error
}

View File

@@ -1,3 +1,4 @@
//nolint: gosec
package e2e
import (
@@ -466,7 +467,7 @@ func (n Node) AddressRPC() string {
// Client returns an RPC client for a node.
func (n Node) Client() (*rpchttp.HTTP, error) {
return rpchttp.New(fmt.Sprintf("http://%s", n.AddressRPC()))
return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort))
}
// Stateless returns true if the node is either a seed node or a light node
@@ -480,8 +481,6 @@ type keyGenerator struct {
}
func newKeyGenerator(seed int64) *keyGenerator {
// nolint: gosec
// G404: Use of weak random number generator (math/rand instead of crypto/rand)
return &keyGenerator{
random: rand.New(rand.NewSource(seed)),
}

View File

@@ -1,32 +1,70 @@
package main
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/test/e2e/pkg/infra"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
// Cleanup destroys all infrastructure and removes all generated testnet files.
func Cleanup(ctx context.Context, logger log.Logger, testnetDir string, ti infra.TestnetInfra) error {
if testnetDir == "" {
return errors.New("no testnet directory set")
// Cleanup removes the Docker Compose containers and testnet directory.
func Cleanup(logger log.Logger, testnet *e2e.Testnet) error {
err := cleanupDocker(logger)
if err != nil {
return err
}
return cleanupDir(logger, testnet.Dir)
}
if err := ti.Cleanup(ctx); err != nil {
// cleanupDocker removes all E2E resources (with label e2e=True), regardless
// of testnet.
func cleanupDocker(logger log.Logger) error {
logger.Info("Removing Docker containers and networks")
// GNU xargs requires the -r flag to not run when input is empty, macOS
// does this by default. Ugly, but works.
xargsR := `$(if [[ $OSTYPE == "linux-gnu"* ]]; then echo -n "-r"; fi)`
err := exec("bash", "-c", fmt.Sprintf(
"docker container ls -qa --filter label=e2e | xargs %v docker container rm -f", xargsR))
if err != nil {
return err
}
_, err := os.Stat(testnetDir)
return exec("bash", "-c", fmt.Sprintf(
"docker network ls -q --filter label=e2e | xargs %v docker network rm", xargsR))
}
// cleanupDir cleans up a testnet directory
func cleanupDir(logger log.Logger, dir string) error {
if dir == "" {
return errors.New("no directory set")
}
_, err := os.Stat(dir)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
logger.Info(fmt.Sprintf("Removing testnet directory %q", testnetDir))
return os.RemoveAll(testnetDir)
logger.Info(fmt.Sprintf("Removing testnet directory %q", dir))
// On Linux, some local files in the volume will be owned by root since Tendermint
// runs as root inside the container, so we need to clean them up from within a
// container running as root too.
absDir, err := filepath.Abs(dir)
if err != nil {
return err
}
err = execDocker("run", "--rm", "--entrypoint", "", "-v", fmt.Sprintf("%v:/network", absDir),
"tendermint/e2e-node", "sh", "-c", "rm -rf /network/*/")
if err != nil {
return err
}
return os.RemoveAll(dir)
}

50
test/e2e/runner/exec.go Normal file
View File

@@ -0,0 +1,50 @@
//nolint: gosec
package main
import (
"fmt"
"os"
osexec "os/exec"
"path/filepath"
)
// execute executes a shell command.
func exec(args ...string) error {
cmd := osexec.Command(args[0], args[1:]...)
out, err := cmd.CombinedOutput()
switch err := err.(type) {
case nil:
return nil
case *osexec.ExitError:
return fmt.Errorf("failed to run %q:\n%v", args, string(out))
default:
return err
}
}
// execVerbose executes a shell command while displaying its output.
func execVerbose(args ...string) error {
cmd := osexec.Command(args[0], args[1:]...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}
// execCompose runs a Docker Compose command for a testnet.
func execCompose(dir string, args ...string) error {
return exec(append(
[]string{"docker-compose", "--ansi=never", "-f", filepath.Join(dir, "docker-compose.yml")},
args...)...)
}
// execComposeVerbose runs a Docker Compose command for a testnet and displays its output.
func execComposeVerbose(dir string, args ...string) error {
return execVerbose(append(
[]string{"docker-compose", "--ansi=never", "-f", filepath.Join(dir, "docker-compose.yml")},
args...)...)
}
// execDocker runs a Docker command.
func execDocker(args ...string) error {
return exec(append([]string{"docker"}, args...)...)
}

View File

@@ -13,8 +13,6 @@ import (
"github.com/tendermint/tendermint/libs/log"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/e2e/pkg/infra"
"github.com/tendermint/tendermint/test/e2e/pkg/infra/docker"
)
const randomSeed = 2308084734268
@@ -35,7 +33,6 @@ func main() {
type CLI struct {
root *cobra.Command
testnet *e2e.Testnet
infra infra.TestnetInfra
preserve bool
}
@@ -56,23 +53,12 @@ func NewCLI(logger log.Logger) *CLI {
if err != nil {
return err
}
providerID, err := cmd.Flags().GetString("provider")
if err != nil {
return err
}
switch providerID {
case "docker":
cli.infra = docker.NewTestnetInfra(logger, testnet)
logger.Info("Using Docker-based infrastructure provider")
default:
return fmt.Errorf("unrecognized infrastructure provider ID: %s", providerID)
}
cli.testnet = testnet
return nil
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
if err = Cleanup(cmd.Context(), logger, cli.testnet.Dir, cli.infra); err != nil {
if err = Cleanup(logger, cli.testnet); err != nil {
return err
}
defer func() {
@@ -81,11 +67,11 @@ func NewCLI(logger log.Logger) *CLI {
} else if err != nil {
logger.Info("Preserving testnet that encountered error",
"err", err)
} else if err := Cleanup(cmd.Context(), logger, cli.testnet.Dir, cli.infra); err != nil {
} else if err := Cleanup(logger, cli.testnet); err != nil {
logger.Error("error cleaning up testnet contents", "err", err)
}
}()
if err = Setup(cmd.Context(), logger, cli.testnet, cli.infra); err != nil {
if err = Setup(logger, cli.testnet); err != nil {
return err
}
@@ -101,7 +87,7 @@ func NewCLI(logger log.Logger) *CLI {
chLoadResult <- Load(lctx, logger, r, cli.testnet)
}()
startAt := time.Now()
if err = Start(ctx, logger, cli.testnet, cli.infra); err != nil {
if err = Start(ctx, logger, cli.testnet); err != nil {
return err
}
@@ -110,7 +96,7 @@ func NewCLI(logger log.Logger) *CLI {
}
if cli.testnet.HasPerturbations() {
if err = Perturb(ctx, logger, cli.testnet, cli.infra); err != nil {
if err = Perturb(ctx, logger, cli.testnet); err != nil {
return err
}
if err = Wait(ctx, logger, cli.testnet, 5); err != nil { // allow some txs to go through
@@ -148,7 +134,7 @@ func NewCLI(logger log.Logger) *CLI {
if err = Wait(ctx, logger, cli.testnet, 5); err != nil { // wait for network to settle before tests
return err
}
if err := Test(ctx, cli.testnet); err != nil {
if err := Test(cli.testnet); err != nil {
return err
}
return nil
@@ -158,8 +144,6 @@ func NewCLI(logger log.Logger) *CLI {
cli.root.PersistentFlags().StringP("file", "f", "", "Testnet TOML manifest")
_ = cli.root.MarkPersistentFlagRequired("file")
cli.root.PersistentFlags().String("provider", "docker", "Which infrastructure provider to use")
cli.root.Flags().BoolVarP(&cli.preserve, "preserve", "p", false,
"Preserves the running of the test net after tests are completed")
@@ -172,7 +156,7 @@ func NewCLI(logger log.Logger) *CLI {
Use: "setup",
Short: "Generates the testnet directory and configuration",
RunE: func(cmd *cobra.Command, args []string) error {
return Setup(cmd.Context(), logger, cli.testnet, cli.infra)
return Setup(logger, cli.testnet)
},
})
@@ -182,12 +166,12 @@ func NewCLI(logger log.Logger) *CLI {
RunE: func(cmd *cobra.Command, args []string) error {
_, err := os.Stat(cli.testnet.Dir)
if os.IsNotExist(err) {
err = Setup(cmd.Context(), logger, cli.testnet, cli.infra)
err = Setup(logger, cli.testnet)
}
if err != nil {
return err
}
return Start(cmd.Context(), logger, cli.testnet, cli.infra)
return Start(cmd.Context(), logger, cli.testnet)
},
})
@@ -195,7 +179,7 @@ func NewCLI(logger log.Logger) *CLI {
Use: "perturb",
Short: "Perturbs the Docker testnet, e.g. by restarting or disconnecting nodes",
RunE: func(cmd *cobra.Command, args []string) error {
return Perturb(cmd.Context(), logger, cli.testnet, cli.infra)
return Perturb(cmd.Context(), logger, cli.testnet)
},
})
@@ -212,7 +196,7 @@ func NewCLI(logger log.Logger) *CLI {
Short: "Stops the Docker testnet",
RunE: func(cmd *cobra.Command, args []string) error {
logger.Info("Stopping testnet")
return cli.infra.Stop(cmd.Context())
return execCompose(cli.testnet.Dir, "down")
},
})
@@ -221,7 +205,7 @@ func NewCLI(logger log.Logger) *CLI {
Short: "Pauses the Docker testnet",
RunE: func(cmd *cobra.Command, args []string) error {
logger.Info("Pausing testnet")
return cli.infra.Pause(cmd.Context())
return execCompose(cli.testnet.Dir, "pause")
},
})
@@ -230,7 +214,7 @@ func NewCLI(logger log.Logger) *CLI {
Short: "Resumes the Docker testnet",
RunE: func(cmd *cobra.Command, args []string) error {
logger.Info("Resuming testnet")
return cli.infra.Unpause(cmd.Context())
return execCompose(cli.testnet.Dir, "unpause")
},
})
@@ -275,7 +259,7 @@ func NewCLI(logger log.Logger) *CLI {
Use: "test",
Short: "Runs test cases against a running testnet",
RunE: func(cmd *cobra.Command, args []string) error {
return Test(cmd.Context(), cli.testnet)
return Test(cli.testnet)
},
})
@@ -283,24 +267,17 @@ func NewCLI(logger log.Logger) *CLI {
Use: "cleanup",
Short: "Removes the testnet directory",
RunE: func(cmd *cobra.Command, args []string) error {
return Cleanup(cmd.Context(), logger, cli.testnet.Dir, cli.infra)
return Cleanup(logger, cli.testnet)
},
})
cli.root.AddCommand(&cobra.Command{
Use: "logs [node]",
Short: "Shows the testnet or a specific node's logs",
Short: "Shows the testnet or a specefic node's logs",
Example: "runner logs validator03",
Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) > 0 {
node := cli.testnet.LookupNode(args[0])
if node == nil {
return fmt.Errorf("no such node: %s", args[0])
}
return cli.infra.ShowNodeLogs(cmd.Context(), node)
}
return cli.infra.ShowLogs(cmd.Context())
return execComposeVerbose(cli.testnet.Dir, append([]string{"logs", "--no-color"}, args...)...)
},
})
@@ -310,13 +287,9 @@ func NewCLI(logger log.Logger) *CLI {
Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 1 {
node := cli.testnet.LookupNode(args[0])
if node == nil {
return fmt.Errorf("no such node: %s", args[0])
}
return cli.infra.TailNodeLogs(cmd.Context(), node)
return execComposeVerbose(cli.testnet.Dir, "logs", "--follow", args[0])
}
return cli.infra.TailLogs(cmd.Context())
return execComposeVerbose(cli.testnet.Dir, "logs", "--follow")
},
})
@@ -329,20 +302,20 @@ func NewCLI(logger log.Logger) *CLI {
Min Block Interval
Max Block Interval
over a 100 block sampling period.
Does not run any perbutations.
`,
RunE: func(cmd *cobra.Command, args []string) error {
if err := Cleanup(cmd.Context(), logger, cli.testnet.Dir, cli.infra); err != nil {
if err := Cleanup(logger, cli.testnet); err != nil {
return err
}
defer func() {
if err := Cleanup(cmd.Context(), logger, cli.testnet.Dir, cli.infra); err != nil {
if err := Cleanup(logger, cli.testnet); err != nil {
logger.Error("error cleaning up testnet contents", "err", err)
}
}()
if err := Setup(cmd.Context(), logger, cli.testnet, cli.infra); err != nil {
if err := Setup(logger, cli.testnet); err != nil {
return err
}
@@ -358,7 +331,7 @@ Does not run any perbutations.
chLoadResult <- Load(lctx, logger, r, cli.testnet)
}()
if err := Start(ctx, logger, cli.testnet, cli.infra); err != nil {
if err := Start(ctx, logger, cli.testnet); err != nil {
return err
}

View File

@@ -8,11 +8,10 @@ import (
"github.com/tendermint/tendermint/libs/log"
rpctypes "github.com/tendermint/tendermint/rpc/coretypes"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/e2e/pkg/infra"
)
// Perturbs a running testnet.
func Perturb(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infra.TestnetInfra) error {
func Perturb(ctx context.Context, logger log.Logger, testnet *e2e.Testnet) error {
timer := time.NewTimer(0) // first tick fires immediately; reset below
defer timer.Stop()
@@ -22,7 +21,7 @@ func Perturb(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti in
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
_, err := PerturbNode(ctx, logger, node, perturbation, ti)
_, err := PerturbNode(ctx, logger, node, perturbation)
if err != nil {
return err
}
@@ -37,45 +36,46 @@ func Perturb(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti in
// PerturbNode perturbs a node with a given perturbation, returning its status
// after recovering.
func PerturbNode(ctx context.Context, logger log.Logger, node *e2e.Node, perturbation e2e.Perturbation, ti infra.TestnetInfra) (*rpctypes.ResultStatus, error) {
func PerturbNode(ctx context.Context, logger log.Logger, node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.ResultStatus, error) {
testnet := node.Testnet
switch perturbation {
case e2e.PerturbationDisconnect:
logger.Info(fmt.Sprintf("Disconnecting node %v...", node.Name))
if err := ti.DisconnectNode(ctx, node); err != nil {
if err := execDocker("network", "disconnect", testnet.Name+"_"+testnet.Name, node.Name); err != nil {
return nil, err
}
time.Sleep(10 * time.Second)
if err := ti.ConnectNode(ctx, node); err != nil {
if err := execDocker("network", "connect", testnet.Name+"_"+testnet.Name, node.Name); err != nil {
return nil, err
}
case e2e.PerturbationKill:
logger.Info(fmt.Sprintf("Killing node %v...", node.Name))
if err := ti.KillNodeProcess(ctx, node); err != nil {
if err := execCompose(testnet.Dir, "kill", "-s", "SIGKILL", node.Name); err != nil {
return nil, err
}
time.Sleep(10 * time.Second)
if err := ti.StartNodeProcess(ctx, node); err != nil {
if err := execCompose(testnet.Dir, "start", node.Name); err != nil {
return nil, err
}
case e2e.PerturbationPause:
logger.Info(fmt.Sprintf("Pausing node %v...", node.Name))
if err := ti.PauseNodeProcess(ctx, node); err != nil {
if err := execCompose(testnet.Dir, "pause", node.Name); err != nil {
return nil, err
}
time.Sleep(10 * time.Second)
if err := ti.UnpauseNodeProcess(ctx, node); err != nil {
if err := execCompose(testnet.Dir, "unpause", node.Name); err != nil {
return nil, err
}
case e2e.PerturbationRestart:
logger.Info(fmt.Sprintf("Restarting node %v...", node.Name))
if err := ti.TerminateNodeProcess(ctx, node); err != nil {
if err := execCompose(testnet.Dir, "kill", "-s", "SIGTERM", node.Name); err != nil {
return nil, err
}
time.Sleep(10 * time.Second)
if err := ti.StartNodeProcess(ctx, node); err != nil {
if err := execCompose(testnet.Dir, "start", node.Name); err != nil {
return nil, err
}

View File

@@ -23,7 +23,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
clients = map[string]*rpchttp.HTTP{}
lastHeight int64
lastIncrease = time.Now()
nodesAtHeight = map[string]int64{}
nodesAtHeight = map[string]struct{}{}
numRunningNodes int
)
if height == 0 {
@@ -85,7 +85,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
// add this node to the set of target
// height nodes
nodesAtHeight[node.Name] = result.SyncInfo.LatestBlockHeight
nodesAtHeight[node.Name] = struct{}{}
// if not all of the nodes that we
// have clients for have reached the

View File

@@ -1,8 +1,8 @@
// nolint: gosec
package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
@@ -12,6 +12,7 @@ import (
"regexp"
"sort"
"strings"
"text/template"
"time"
"github.com/BurntSushi/toml"
@@ -21,7 +22,6 @@ import (
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/privval"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/e2e/pkg/infra"
"github.com/tendermint/tendermint/types"
)
@@ -39,7 +39,7 @@ const (
)
// Setup sets up the testnet configuration.
func Setup(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infra.TestnetInfra) error {
func Setup(logger log.Logger, testnet *e2e.Testnet) error {
logger.Info(fmt.Sprintf("Generating testnet files in %q", testnet.Dir))
err := os.MkdirAll(testnet.Dir, os.ModePerm)
@@ -47,6 +47,15 @@ func Setup(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infr
return err
}
compose, err := MakeDockerCompose(testnet)
if err != nil {
return err
}
err = os.WriteFile(filepath.Join(testnet.Dir, "docker-compose.yml"), compose, 0644)
if err != nil {
return err
}
genesis, err := MakeGenesis(testnet)
if err != nil {
return err
@@ -83,8 +92,6 @@ func Setup(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infr
if err != nil {
return err
}
// nolint: gosec
// G306: Expect WriteFile permissions to be 0600 or less
err = os.WriteFile(filepath.Join(nodeDir, "config", "app.toml"), appCfg, 0644)
if err != nil {
return err
@@ -124,13 +131,70 @@ func Setup(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infr
}
}
if err := ti.Setup(ctx); err != nil {
return err
}
return nil
}
// MakeDockerCompose generates a Docker Compose config for a testnet.
func MakeDockerCompose(testnet *e2e.Testnet) ([]byte, error) {
// Must use version 2 Docker Compose format, to support IPv6.
tmpl, err := template.New("docker-compose").Funcs(template.FuncMap{
"addUint32": func(x, y uint32) uint32 {
return x + y
},
"isBuiltin": func(protocol e2e.Protocol, mode e2e.Mode) bool {
return mode == e2e.ModeLight || protocol == e2e.ProtocolBuiltin
},
}).Parse(`version: '2.4'
networks:
{{ .Name }}:
labels:
e2e: true
driver: bridge
{{- if .IPv6 }}
enable_ipv6: true
{{- end }}
ipam:
driver: default
config:
- subnet: {{ .IP }}
services:
{{- range .Nodes }}
{{ .Name }}:
labels:
e2e: true
container_name: {{ .Name }}
image: tendermint/e2e-node
{{- if isBuiltin $.ABCIProtocol .Mode }}
entrypoint: /usr/bin/entrypoint-builtin
{{- else if .LogLevel }}
command: start --log-level {{ .LogLevel }}
{{- end }}
init: true
ports:
- 26656
- {{ if .ProxyPort }}{{ addUint32 .ProxyPort 1000 }}:{{ end }}26660
- {{ if .ProxyPort }}{{ .ProxyPort }}:{{ end }}26657
- 6060
volumes:
- ./{{ .Name }}:/tendermint
networks:
{{ $.Name }}:
ipv{{ if $.IPv6 }}6{{ else }}4{{ end}}_address: {{ .IP }}
{{end}}`)
if err != nil {
return nil, err
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, testnet)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// MakeGenesis generates a genesis document.
func MakeGenesis(testnet *e2e.Testnet) (types.GenesisDoc, error) {
genesis := types.GenesisDoc{
@@ -357,7 +421,5 @@ func UpdateConfigStateSync(node *e2e.Node, height int64, hash []byte) error {
}
bz = regexp.MustCompile(`(?m)^trust-height =.*`).ReplaceAll(bz, []byte(fmt.Sprintf(`trust-height = %v`, height)))
bz = regexp.MustCompile(`(?m)^trust-hash =.*`).ReplaceAll(bz, []byte(fmt.Sprintf(`trust-hash = "%X"`, hash)))
// nolint: gosec
// G306: Expect WriteFile permissions to be 0600 or less
return os.WriteFile(cfgPath, bz, 0644)
}

View File

@@ -8,10 +8,9 @@ import (
"github.com/tendermint/tendermint/libs/log"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/e2e/pkg/infra"
)
func Start(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infra.TestnetInfra) error {
func Start(ctx context.Context, logger log.Logger, testnet *e2e.Testnet) error {
if len(testnet.Nodes) == 0 {
return fmt.Errorf("no nodes in testnet")
}
@@ -45,7 +44,7 @@ func Start(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infr
for len(nodeQueue) > 0 && nodeQueue[0].StartAt == 0 {
node := nodeQueue[0]
nodeQueue = nodeQueue[1:]
if err := ti.StartNode(ctx, node); err != nil {
if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil {
return err
}
@@ -59,7 +58,7 @@ func Start(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infr
return err
}
node.HasStarted = true
logger.Info(fmt.Sprintf("Node %v up on http://%v:%v", node.IP, node.Name, node.ProxyPort))
logger.Info(fmt.Sprintf("Node %v up on http://127.0.0.1:%v", node.Name, node.ProxyPort))
}
networkHeight := testnet.InitialHeight
@@ -107,7 +106,7 @@ func Start(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infr
}
}
if err := ti.StartNode(ctx, node); err != nil {
if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil {
return err
}
@@ -129,8 +128,8 @@ func Start(ctx context.Context, logger log.Logger, testnet *e2e.Testnet, ti infr
} else {
lastNodeHeight = status.SyncInfo.LatestBlockHeight
}
logger.Info(fmt.Sprintf("Node %v up on http://%v:%v at height %v",
node.IP, node.Name, node.ProxyPort, lastNodeHeight))
logger.Info(fmt.Sprintf("Node %v up on http://127.0.0.1:%v at height %v",
node.Name, node.ProxyPort, lastNodeHeight))
}
return nil

View File

@@ -1,19 +1,17 @@
package main
import (
"context"
"os"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/test/e2e/pkg/exec"
)
// Test runs test cases under tests/
func Test(ctx context.Context, testnet *e2e.Testnet) error {
func Test(testnet *e2e.Testnet) error {
err := os.Setenv("E2E_MANIFEST", testnet.File)
if err != nil {
return err
}
return exec.CommandVerbose(ctx, "./build/tests", "-test.count=1")
return execVerbose("./build/tests", "-test.count=1")
}

View File

@@ -18,8 +18,6 @@ func TestNet_Peers(t *testing.T) {
netInfo, err := client.NetInfo(ctx)
require.NoError(t, err)
// FIXME: https://github.com/tendermint/tendermint/issues/8848
// We should be able to assert that we can discover all peers in a network
expectedPeers := len(node.Testnet.Nodes)
peers := make(map[string]*e2e.Node, 0)
seen := map[string]bool{}
@@ -33,7 +31,7 @@ func TestNet_Peers(t *testing.T) {
seen[n.Name] = false
}
require.GreaterOrEqual(t, netInfo.NPeers, expectedPeers-1,
require.Equal(t, expectedPeers, netInfo.NPeers,
"node is not fully meshed with peers")
for _, peerInfo := range netInfo.Peers {
@@ -45,10 +43,8 @@ func TestNet_Peers(t *testing.T) {
seen[peer.Name] = true
}
// FIXME: https://github.com/tendermint/tendermint/issues/8848
// We should be able to assert that we can discover all peers in a network
// for name := range seen {
// require.True(t, seen[name], "node %v not peered with %v", node.Name, name)
// }
for name := range seen {
require.True(t, seen[name], "node %v not peered with %v", node.Name, name)
}
})
}

View File

@@ -7,8 +7,8 @@ import (
"strconv"
"strings"
tmstrings "github.com/tendermint/tendermint/internal/libs/strings"
"github.com/tendermint/tendermint/libs/bytes"
tmstrings "github.com/tendermint/tendermint/libs/strings"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)

View File

@@ -9,7 +9,7 @@ import (
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/secp256k1"
"github.com/tendermint/tendermint/crypto/sr25519"
tmstrings "github.com/tendermint/tendermint/internal/libs/strings"
tmstrings "github.com/tendermint/tendermint/libs/strings"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)