mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-03 18:42:14 +00:00
* regenerate mocks using newer style
* p2p: set empty timeouts to small values. (#8847)
These timeouts default to 'do not time out' if they are not set. This times up resources, potentially indefinitely. If node on the other side of the the handshake is up but unresponsive, the[ handshake call](edec79448a/internal/p2p/router.go (L720)) will _never_ return.
* fix light client select statement
933 lines
23 KiB
Go
933 lines
23 KiB
Go
package p2p
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"regexp"
|
|
"strconv"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/crypto/ed25519"
|
|
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
|
"github.com/tendermint/tendermint/internal/p2p/conn"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
var (
|
|
cfg *config.P2PConfig
|
|
ctx = context.Background()
|
|
)
|
|
|
|
func init() {
|
|
cfg = config.DefaultP2PConfig()
|
|
cfg.PexReactor = true
|
|
cfg.AllowDuplicateIP = true
|
|
}
|
|
|
|
type PeerMessage struct {
|
|
PeerID types.NodeID
|
|
Bytes []byte
|
|
Counter int
|
|
}
|
|
|
|
type TestReactor struct {
|
|
BaseReactor
|
|
|
|
mtx tmsync.Mutex
|
|
channels []*conn.ChannelDescriptor
|
|
logMessages bool
|
|
msgsCounter int
|
|
msgsReceived map[byte][]PeerMessage
|
|
}
|
|
|
|
func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
|
|
tr := &TestReactor{
|
|
channels: channels,
|
|
logMessages: logMessages,
|
|
msgsReceived: make(map[byte][]PeerMessage),
|
|
}
|
|
tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
|
|
tr.SetLogger(log.TestingLogger())
|
|
return tr
|
|
}
|
|
|
|
func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
|
|
return tr.channels
|
|
}
|
|
|
|
func (tr *TestReactor) AddPeer(peer Peer) {}
|
|
|
|
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
|
|
|
|
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
|
|
if tr.logMessages {
|
|
tr.mtx.Lock()
|
|
defer tr.mtx.Unlock()
|
|
// fmt.Printf("Received: %X, %X\n", chID, msgBytes)
|
|
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
|
|
tr.msgsCounter++
|
|
}
|
|
}
|
|
|
|
func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
|
|
tr.mtx.Lock()
|
|
defer tr.mtx.Unlock()
|
|
return tr.msgsReceived[chID]
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
// convenience method for creating two switches connected to each other.
|
|
// XXX: note this uses net.Pipe and not a proper TCP conn
|
|
func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
|
|
// Create two switches that will be interconnected.
|
|
switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches)
|
|
return switches[0], switches[1]
|
|
}
|
|
|
|
func initSwitchFunc(i int, sw *Switch) *Switch {
|
|
sw.SetAddrBook(&AddrBookMock{
|
|
Addrs: make(map[string]struct{}),
|
|
OurAddrs: make(map[string]struct{})})
|
|
|
|
// Make two reactors of two channels each
|
|
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
|
|
{ID: byte(0x00), Priority: 10},
|
|
{ID: byte(0x01), Priority: 10},
|
|
}, true))
|
|
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
|
|
{ID: byte(0x02), Priority: 10},
|
|
{ID: byte(0x03), Priority: 10},
|
|
}, true))
|
|
|
|
return sw
|
|
}
|
|
|
|
func TestSwitches(t *testing.T) {
|
|
s1, s2 := MakeSwitchPair(t, initSwitchFunc)
|
|
t.Cleanup(func() {
|
|
if err := s1.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
t.Cleanup(func() {
|
|
if err := s2.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
if s1.Peers().Size() != 1 {
|
|
t.Errorf("expected exactly 1 peer in s1, got %v", s1.Peers().Size())
|
|
}
|
|
if s2.Peers().Size() != 1 {
|
|
t.Errorf("expected exactly 1 peer in s2, got %v", s2.Peers().Size())
|
|
}
|
|
|
|
// Lets send some messages
|
|
ch0Msg := []byte("channel zero")
|
|
ch1Msg := []byte("channel foo")
|
|
ch2Msg := []byte("channel bar")
|
|
|
|
s1.Broadcast(byte(0x00), ch0Msg)
|
|
s1.Broadcast(byte(0x01), ch1Msg)
|
|
s1.Broadcast(byte(0x02), ch2Msg)
|
|
|
|
assertMsgReceivedWithTimeout(t,
|
|
ch0Msg,
|
|
byte(0x00),
|
|
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
|
assertMsgReceivedWithTimeout(t,
|
|
ch1Msg,
|
|
byte(0x01),
|
|
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
|
assertMsgReceivedWithTimeout(t,
|
|
ch2Msg,
|
|
byte(0x02),
|
|
s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
|
}
|
|
|
|
func assertMsgReceivedWithTimeout(
|
|
t *testing.T,
|
|
msgBytes []byte,
|
|
channel byte,
|
|
reactor *TestReactor,
|
|
checkPeriod,
|
|
timeout time.Duration,
|
|
) {
|
|
ticker := time.NewTicker(checkPeriod)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
msgs := reactor.getMsgs(channel)
|
|
if len(msgs) > 0 {
|
|
if !bytes.Equal(msgs[0].Bytes, msgBytes) {
|
|
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
|
|
}
|
|
return
|
|
}
|
|
|
|
case <-time.After(timeout):
|
|
t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSwitchFiltersOutItself(t *testing.T) {
|
|
s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc, log.TestingLogger())
|
|
|
|
// simulate s1 having a public IP by creating a remote peer with the same ID
|
|
rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
|
|
rp.Start()
|
|
|
|
// addr should be rejected in addPeer based on the same ID
|
|
err := s1.DialPeerWithAddress(rp.Addr())
|
|
if assert.Error(t, err) {
|
|
if err, ok := err.(ErrRejected); ok {
|
|
if !err.IsSelf() {
|
|
t.Errorf("expected self to be rejected")
|
|
}
|
|
} else {
|
|
t.Errorf("expected ErrRejected")
|
|
}
|
|
}
|
|
|
|
assert.True(t, s1.addrBook.OurAddress(rp.Addr()))
|
|
assert.False(t, s1.addrBook.HasAddress(rp.Addr()))
|
|
|
|
rp.Stop()
|
|
|
|
assertNoPeersAfterTimeout(t, s1, 100*time.Millisecond)
|
|
}
|
|
|
|
func TestSwitchDialFailsOnIncompatiblePeer(t *testing.T) {
|
|
s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc, log.TestingLogger())
|
|
ni := s1.NodeInfo()
|
|
ni.Network = "network-a"
|
|
s1.SetNodeInfo(ni)
|
|
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg, Network: "network-b"}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
|
|
err := s1.DialPeerWithAddress(rp.Addr())
|
|
require.Error(t, err)
|
|
errRejected, ok := err.(ErrRejected)
|
|
require.True(t, ok, "expected error to be of type IsRejected")
|
|
require.True(t, errRejected.IsIncompatible(), "expected error to be IsIncompatible")
|
|
|
|
// remote peer should not have been added to the addressbook
|
|
require.False(t, s1.addrBook.HasAddress(rp.Addr()))
|
|
}
|
|
|
|
func TestSwitchPeerFilter(t *testing.T) {
|
|
var (
|
|
filters = []PeerFilterFunc{
|
|
func(_ IPeerSet, _ Peer) error { return nil },
|
|
func(_ IPeerSet, _ Peer) error { return fmt.Errorf("denied") },
|
|
func(_ IPeerSet, _ Peer) error { return nil },
|
|
}
|
|
sw = MakeSwitch(
|
|
cfg,
|
|
1,
|
|
"testing",
|
|
"123.123.123",
|
|
initSwitchFunc,
|
|
log.TestingLogger(),
|
|
SwitchPeerFilters(filters...),
|
|
)
|
|
)
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := sw.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
// simulate remote peer
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
rp.Start()
|
|
t.Cleanup(rp.Stop)
|
|
|
|
c, err := sw.transport.Dial(ctx, NewEndpoint(rp.Addr()))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
p := newPeer(
|
|
peerInfo,
|
|
newPeerConn(true, false, c),
|
|
sw.reactorsByCh,
|
|
sw.StopPeerForError,
|
|
)
|
|
|
|
err = sw.addPeer(p)
|
|
if err, ok := err.(ErrRejected); ok {
|
|
if !err.IsFiltered() {
|
|
t.Errorf("expected peer to be filtered")
|
|
}
|
|
} else {
|
|
t.Errorf("expected ErrRejected")
|
|
}
|
|
}
|
|
|
|
func TestSwitchPeerFilterTimeout(t *testing.T) {
|
|
var (
|
|
filters = []PeerFilterFunc{
|
|
func(_ IPeerSet, _ Peer) error {
|
|
time.Sleep(10 * time.Millisecond)
|
|
return nil
|
|
},
|
|
}
|
|
sw = MakeSwitch(
|
|
cfg,
|
|
1,
|
|
"testing",
|
|
"123.123.123",
|
|
initSwitchFunc,
|
|
log.TestingLogger(),
|
|
SwitchFilterTimeout(5*time.Millisecond),
|
|
SwitchPeerFilters(filters...),
|
|
)
|
|
)
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := sw.Stop(); err != nil {
|
|
t.Log(err)
|
|
}
|
|
})
|
|
|
|
// simulate remote peer
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
|
|
c, err := sw.transport.Dial(ctx, NewEndpoint(rp.Addr()))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
p := newPeer(
|
|
peerInfo,
|
|
newPeerConn(true, false, c),
|
|
sw.reactorsByCh,
|
|
sw.StopPeerForError,
|
|
)
|
|
|
|
err = sw.addPeer(p)
|
|
if _, ok := err.(ErrFilterTimeout); !ok {
|
|
t.Errorf("expected ErrFilterTimeout")
|
|
}
|
|
}
|
|
|
|
func TestSwitchPeerFilterDuplicate(t *testing.T) {
|
|
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := sw.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
// simulate remote peer
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
|
|
c, err := sw.transport.Dial(ctx, NewEndpoint(rp.Addr()))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
p := newPeer(
|
|
peerInfo,
|
|
newPeerConn(true, false, c),
|
|
sw.reactorsByCh,
|
|
sw.StopPeerForError,
|
|
)
|
|
|
|
if err := sw.addPeer(p); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = sw.addPeer(p)
|
|
if errRej, ok := err.(ErrRejected); ok {
|
|
if !errRej.IsDuplicate() {
|
|
t.Errorf("expected peer to be duplicate. got %v", errRej)
|
|
}
|
|
} else {
|
|
t.Errorf("expected ErrRejected, got %v", err)
|
|
}
|
|
}
|
|
|
|
func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
|
|
time.Sleep(timeout)
|
|
if sw.Peers().Size() != 0 {
|
|
t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
|
|
}
|
|
}
|
|
|
|
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
|
|
assert, require := assert.New(t), require.New(t)
|
|
|
|
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
|
|
err := sw.Start()
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
t.Cleanup(func() {
|
|
if err := sw.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
// simulate remote peer
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
|
|
c, err := sw.transport.Dial(ctx, NewEndpoint(rp.Addr()))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
p := newPeer(
|
|
peerInfo,
|
|
newPeerConn(true, false, c),
|
|
sw.reactorsByCh,
|
|
sw.StopPeerForError,
|
|
)
|
|
|
|
err = sw.addPeer(p)
|
|
require.Nil(err)
|
|
|
|
require.NotNil(sw.Peers().Get(rp.ID()))
|
|
|
|
// simulate failure by closing connection
|
|
err = p.CloseConn()
|
|
require.NoError(err)
|
|
|
|
assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
|
|
assert.False(p.IsRunning())
|
|
}
|
|
|
|
func TestSwitchStopPeerForError(t *testing.T) {
|
|
s := httptest.NewServer(promhttp.Handler())
|
|
defer s.Close()
|
|
|
|
scrapeMetrics := func() string {
|
|
resp, err := http.Get(s.URL)
|
|
require.NoError(t, err)
|
|
defer resp.Body.Close()
|
|
buf, _ := ioutil.ReadAll(resp.Body)
|
|
return string(buf)
|
|
}
|
|
|
|
namespace, subsystem, name := config.TestInstrumentationConfig().Namespace, MetricsSubsystem, "peers"
|
|
re := regexp.MustCompile(namespace + `_` + subsystem + `_` + name + ` ([0-9\.]+)`)
|
|
peersMetricValue := func() float64 {
|
|
matches := re.FindStringSubmatch(scrapeMetrics())
|
|
f, _ := strconv.ParseFloat(matches[1], 64)
|
|
return f
|
|
}
|
|
|
|
p2pMetrics := PrometheusMetrics(namespace)
|
|
|
|
// make two connected switches
|
|
sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
|
|
// set metrics on sw1
|
|
if i == 0 {
|
|
opt := WithMetrics(p2pMetrics)
|
|
opt(sw)
|
|
}
|
|
return initSwitchFunc(i, sw)
|
|
})
|
|
|
|
assert.Equal(t, len(sw1.Peers().List()), 1)
|
|
assert.EqualValues(t, 1, peersMetricValue())
|
|
|
|
// send messages to the peer from sw1
|
|
p := sw1.Peers().List()[0]
|
|
p.Send(0x1, []byte("here's a message to send"))
|
|
|
|
// stop sw2. this should cause the p to fail,
|
|
// which results in calling StopPeerForError internally
|
|
t.Cleanup(func() {
|
|
if err := sw2.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
// now call StopPeerForError explicitly, eg. from a reactor
|
|
sw1.StopPeerForError(p, fmt.Errorf("some err"))
|
|
|
|
assert.Equal(t, len(sw1.Peers().List()), 0)
|
|
assert.EqualValues(t, 0, peersMetricValue())
|
|
}
|
|
|
|
func TestSwitchReconnectsToOutboundPersistentPeer(t *testing.T) {
|
|
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := sw.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
// 1. simulate failure by closing connection
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
|
|
err = sw.AddPersistentPeers([]string{rp.Addr().String()})
|
|
require.NoError(t, err)
|
|
|
|
err = sw.DialPeerWithAddress(rp.Addr())
|
|
require.Nil(t, err)
|
|
require.NotNil(t, sw.Peers().Get(rp.ID()))
|
|
|
|
p := sw.Peers().List()[0]
|
|
err = p.(*peer).CloseConn()
|
|
require.NoError(t, err)
|
|
|
|
waitUntilSwitchHasAtLeastNPeers(sw, 1)
|
|
assert.False(t, p.IsRunning()) // old peer instance
|
|
assert.Equal(t, 1, sw.Peers().Size()) // new peer instance
|
|
|
|
// 2. simulate first time dial failure
|
|
rp = &remotePeer{
|
|
PrivKey: ed25519.GenPrivKey(),
|
|
Config: cfg,
|
|
// Use different interface to prevent duplicate IP filter, this will break
|
|
// beyond two peers.
|
|
listenAddr: "127.0.0.1:0",
|
|
}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
|
|
conf := config.DefaultP2PConfig()
|
|
conf.TestDialFail = true // will trigger a reconnect
|
|
err = sw.addOutboundPeerWithConfig(rp.Addr(), conf)
|
|
require.NotNil(t, err)
|
|
// DialPeerWithAddres - sw.peerConfig resets the dialer
|
|
waitUntilSwitchHasAtLeastNPeers(sw, 2)
|
|
assert.Equal(t, 2, sw.Peers().Size())
|
|
}
|
|
|
|
func TestSwitchReconnectsToInboundPersistentPeer(t *testing.T) {
|
|
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := sw.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
// 1. simulate failure by closing the connection
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
|
|
err = sw.AddPersistentPeers([]string{rp.Addr().String()})
|
|
require.NoError(t, err)
|
|
|
|
conn, err := rp.Dial(sw.NetAddress())
|
|
require.NoError(t, err)
|
|
time.Sleep(50 * time.Millisecond)
|
|
require.NotNil(t, sw.Peers().Get(rp.ID()))
|
|
|
|
conn.Close()
|
|
|
|
waitUntilSwitchHasAtLeastNPeers(sw, 1)
|
|
assert.Equal(t, 1, sw.Peers().Size())
|
|
}
|
|
|
|
func TestSwitchDialPeersAsync(t *testing.T) {
|
|
if testing.Short() {
|
|
return
|
|
}
|
|
|
|
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := sw.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
|
|
err = sw.DialPeersAsync([]string{rp.Addr().String()})
|
|
require.NoError(t, err)
|
|
time.Sleep(dialRandomizerIntervalMilliseconds * time.Millisecond)
|
|
require.NotNil(t, sw.Peers().Get(rp.ID()))
|
|
}
|
|
|
|
func waitUntilSwitchHasAtLeastNPeers(sw *Switch, n int) {
|
|
for i := 0; i < 20; i++ {
|
|
time.Sleep(250 * time.Millisecond)
|
|
has := sw.Peers().Size()
|
|
if has >= n {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSwitchFullConnectivity(t *testing.T) {
|
|
switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches)
|
|
defer func() {
|
|
for _, sw := range switches {
|
|
sw := sw
|
|
t.Cleanup(func() {
|
|
if err := sw.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
}
|
|
}()
|
|
|
|
for i, sw := range switches {
|
|
if sw.Peers().Size() != 2 {
|
|
t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSwitchAcceptRoutine(t *testing.T) {
|
|
cfg.MaxNumInboundPeers = 5
|
|
|
|
// Create some unconditional peers.
|
|
const unconditionalPeersNum = 2
|
|
var (
|
|
unconditionalPeers = make([]*remotePeer, unconditionalPeersNum)
|
|
unconditionalPeerIDs = make([]string, unconditionalPeersNum)
|
|
)
|
|
for i := 0; i < unconditionalPeersNum; i++ {
|
|
peer := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
peer.Start()
|
|
unconditionalPeers[i] = peer
|
|
unconditionalPeerIDs[i] = string(peer.ID())
|
|
}
|
|
|
|
// make switch
|
|
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
|
|
err := sw.AddUnconditionalPeerIDs(unconditionalPeerIDs)
|
|
require.NoError(t, err)
|
|
err = sw.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
err := sw.Stop()
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
// 0. check there are no peers
|
|
assert.Equal(t, 0, sw.Peers().Size())
|
|
|
|
// 1. check we connect up to MaxNumInboundPeers
|
|
peers := make([]*remotePeer, 0)
|
|
for i := 0; i < cfg.MaxNumInboundPeers; i++ {
|
|
peer := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
peers = append(peers, peer)
|
|
peer.Start()
|
|
c, err := peer.Dial(sw.NetAddress())
|
|
require.NoError(t, err)
|
|
// spawn a reading routine to prevent connection from closing
|
|
go func(c net.Conn) {
|
|
for {
|
|
one := make([]byte, 1)
|
|
_, err := c.Read(one)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}(c)
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
|
|
|
|
// 2. check we close new connections if we already have MaxNumInboundPeers peers
|
|
peer := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
peer.Start()
|
|
conn, err := peer.Dial(sw.NetAddress())
|
|
require.NoError(t, err)
|
|
// check conn is closed
|
|
one := make([]byte, 1)
|
|
_ = conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
|
|
_, err = conn.Read(one)
|
|
assert.Error(t, err)
|
|
assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
|
|
peer.Stop()
|
|
|
|
// 3. check we connect to unconditional peers despite the limit.
|
|
for _, peer := range unconditionalPeers {
|
|
c, err := peer.Dial(sw.NetAddress())
|
|
require.NoError(t, err)
|
|
// spawn a reading routine to prevent connection from closing
|
|
go func(c net.Conn) {
|
|
for {
|
|
one := make([]byte, 1)
|
|
_, err := c.Read(one)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}(c)
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalPeersNum, sw.Peers().Size())
|
|
|
|
for _, peer := range peers {
|
|
peer.Stop()
|
|
}
|
|
for _, peer := range unconditionalPeers {
|
|
peer.Stop()
|
|
}
|
|
}
|
|
|
|
func TestSwitchRejectsIncompatiblePeers(t *testing.T) {
|
|
sw := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc, log.TestingLogger())
|
|
ni := sw.NodeInfo()
|
|
ni.Network = "network-a"
|
|
sw.SetNodeInfo(ni)
|
|
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
err := sw.Stop()
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg, Network: "network-b"}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
|
|
assert.Equal(t, 0, sw.Peers().Size())
|
|
|
|
conn, err := rp.Dial(sw.NetAddress())
|
|
assert.Nil(t, err)
|
|
|
|
one := make([]byte, 1)
|
|
_ = conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
|
|
_, err = conn.Read(one)
|
|
assert.Error(t, err)
|
|
|
|
assert.Equal(t, 0, sw.Peers().Size())
|
|
}
|
|
|
|
type errorTransport struct {
|
|
acceptErr error
|
|
}
|
|
|
|
func (et errorTransport) String() string {
|
|
return "error"
|
|
}
|
|
|
|
func (et errorTransport) Protocols() []Protocol {
|
|
return []Protocol{"error"}
|
|
}
|
|
|
|
func (et errorTransport) Accept() (Connection, error) {
|
|
return nil, et.acceptErr
|
|
}
|
|
func (errorTransport) Dial(context.Context, Endpoint) (Connection, error) {
|
|
panic("not implemented")
|
|
}
|
|
func (errorTransport) Close() error { panic("not implemented") }
|
|
func (errorTransport) FlushClose() error { panic("not implemented") }
|
|
func (errorTransport) Endpoints() []Endpoint { panic("not implemented") }
|
|
|
|
func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
|
|
sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}})
|
|
assert.NotPanics(t, func() {
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
err = sw.Stop()
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
sw = NewSwitch(cfg, errorTransport{ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}})
|
|
assert.NotPanics(t, func() {
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
err = sw.Stop()
|
|
require.NoError(t, err)
|
|
})
|
|
// TODO(melekes) check we remove our address from addrBook
|
|
|
|
sw = NewSwitch(cfg, errorTransport{ErrTransportClosed{}})
|
|
assert.NotPanics(t, func() {
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
err = sw.Stop()
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
// mockReactor checks that InitPeer never called before RemovePeer. If that's
|
|
// not true, InitCalledBeforeRemoveFinished will return true.
|
|
type mockReactor struct {
|
|
*BaseReactor
|
|
|
|
// atomic
|
|
removePeerInProgress uint32
|
|
initCalledBeforeRemoveFinished uint32
|
|
}
|
|
|
|
func (r *mockReactor) GetChannels() []*ChannelDescriptor {
|
|
return []*ChannelDescriptor{{ID: testCh, Priority: 10}}
|
|
}
|
|
|
|
func (r *mockReactor) RemovePeer(peer Peer, reason interface{}) {
|
|
atomic.StoreUint32(&r.removePeerInProgress, 1)
|
|
defer atomic.StoreUint32(&r.removePeerInProgress, 0)
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
func (r *mockReactor) InitPeer(peer Peer) Peer {
|
|
if atomic.LoadUint32(&r.removePeerInProgress) == 1 {
|
|
atomic.StoreUint32(&r.initCalledBeforeRemoveFinished, 1)
|
|
}
|
|
|
|
return peer
|
|
}
|
|
|
|
func (r *mockReactor) InitCalledBeforeRemoveFinished() bool {
|
|
return atomic.LoadUint32(&r.initCalledBeforeRemoveFinished) == 1
|
|
}
|
|
|
|
// see stopAndRemovePeer
|
|
func TestSwitchInitPeerIsNotCalledBeforeRemovePeer(t *testing.T) {
|
|
// make reactor
|
|
reactor := &mockReactor{}
|
|
reactor.BaseReactor = NewBaseReactor("mockReactor", reactor)
|
|
|
|
// make switch
|
|
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", func(i int, sw *Switch) *Switch {
|
|
sw.AddReactor("mock", reactor)
|
|
return sw
|
|
}, log.TestingLogger())
|
|
err := sw.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := sw.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
// add peer
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
rp.Start()
|
|
defer rp.Stop()
|
|
_, err = rp.Dial(sw.NetAddress())
|
|
require.NoError(t, err)
|
|
|
|
// wait till the switch adds rp to the peer set, then stop the peer asynchronously
|
|
for {
|
|
time.Sleep(20 * time.Millisecond)
|
|
if peer := sw.Peers().Get(rp.ID()); peer != nil {
|
|
go sw.StopPeerForError(peer, "test")
|
|
break
|
|
}
|
|
}
|
|
|
|
// simulate peer reconnecting to us
|
|
_, err = rp.Dial(sw.NetAddress())
|
|
require.NoError(t, err)
|
|
// wait till the switch adds rp to the peer set
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// make sure reactor.RemovePeer is finished before InitPeer is called
|
|
assert.False(t, reactor.InitCalledBeforeRemoveFinished())
|
|
}
|
|
|
|
func BenchmarkSwitchBroadcast(b *testing.B) {
|
|
s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
|
|
// Make bar reactors of bar channels each
|
|
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
|
|
{ID: byte(0x00), Priority: 10},
|
|
{ID: byte(0x01), Priority: 10},
|
|
}, false))
|
|
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
|
|
{ID: byte(0x02), Priority: 10},
|
|
{ID: byte(0x03), Priority: 10},
|
|
}, false))
|
|
return sw
|
|
})
|
|
|
|
b.Cleanup(func() {
|
|
if err := s1.Stop(); err != nil {
|
|
b.Error(err)
|
|
}
|
|
})
|
|
|
|
b.Cleanup(func() {
|
|
if err := s2.Stop(); err != nil {
|
|
b.Error(err)
|
|
}
|
|
})
|
|
|
|
// Allow time for goroutines to boot up
|
|
time.Sleep(1 * time.Second)
|
|
|
|
b.ResetTimer()
|
|
|
|
numSuccess, numFailure := 0, 0
|
|
|
|
// Send random message from foo channel to another
|
|
for i := 0; i < b.N; i++ {
|
|
chID := byte(i % 4)
|
|
successChan := s1.Broadcast(chID, []byte("test data"))
|
|
for s := range successChan {
|
|
if s {
|
|
numSuccess++
|
|
} else {
|
|
numFailure++
|
|
}
|
|
}
|
|
}
|
|
|
|
b.Logf("success: %v, failure: %v", numSuccess, numFailure)
|
|
}
|
|
|
|
func TestNewNetAddressStrings(t *testing.T) {
|
|
addrs, errs := NewNetAddressStrings([]string{
|
|
"127.0.0.1:8080",
|
|
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeef@127.0.0.1:8080",
|
|
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeed@127.0.0.2:8080"})
|
|
assert.Len(t, errs, 1)
|
|
assert.Equal(t, 2, len(addrs))
|
|
}
|