mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-06 05:25:35 +00:00
* p2p: add a per-message type send and receive metric (#9622) * p2p: ressurrect the p2p envelope and use to calculate message metric Add new SendEnvelope, TrySendEnvelope, BroadcastEnvelope, and ReceiveEnvelope methods in the p2p package to work with the new envelope type. Care was taken to ensure this was performed in a non-breaking manner. Co-authored-by: William Banfield <4561443+williambanfield@users.noreply.github.com> Co-authored-by: William Banfield <wbanfield@gmail.com>
243 lines
5.5 KiB
Go
243 lines
5.5 KiB
Go
package p2p
|
|
|
|
import (
|
|
"fmt"
|
|
golog "log"
|
|
"net"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/tendermint/tendermint/crypto"
|
|
"github.com/tendermint/tendermint/crypto/ed25519"
|
|
"github.com/tendermint/tendermint/libs/bytes"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/proto/tendermint/p2p"
|
|
|
|
"github.com/tendermint/tendermint/config"
|
|
tmconn "github.com/tendermint/tendermint/p2p/conn"
|
|
)
|
|
|
|
func TestPeerBasic(t *testing.T) {
|
|
assert, require := assert.New(t), require.New(t)
|
|
|
|
// simulate remote peer
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
|
rp.Start()
|
|
t.Cleanup(rp.Stop)
|
|
|
|
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), cfg, tmconn.DefaultMConnConfig())
|
|
require.Nil(err)
|
|
|
|
err = p.Start()
|
|
require.Nil(err)
|
|
t.Cleanup(func() {
|
|
if err := p.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
assert.True(p.IsRunning())
|
|
assert.True(p.IsOutbound())
|
|
assert.False(p.IsPersistent())
|
|
p.persistent = true
|
|
assert.True(p.IsPersistent())
|
|
assert.Equal(rp.Addr().DialString(), p.RemoteAddr().String())
|
|
assert.Equal(rp.ID(), p.ID())
|
|
}
|
|
|
|
func TestPeerSend(t *testing.T) {
|
|
assert, require := assert.New(t), require.New(t)
|
|
|
|
config := cfg
|
|
|
|
// simulate remote peer
|
|
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: config}
|
|
rp.Start()
|
|
t.Cleanup(rp.Stop)
|
|
|
|
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config, tmconn.DefaultMConnConfig())
|
|
require.Nil(err)
|
|
|
|
err = p.Start()
|
|
require.Nil(err)
|
|
|
|
t.Cleanup(func() {
|
|
if err := p.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
assert.True(p.CanSend(testCh))
|
|
assert.True(SendEnvelopeShim(p, Envelope{ChannelID: testCh, Message: &p2p.Message{}}, p.Logger))
|
|
}
|
|
|
|
func createOutboundPeerAndPerformHandshake(
|
|
addr *NetAddress,
|
|
config *config.P2PConfig,
|
|
mConfig tmconn.MConnConfig,
|
|
) (*peer, error) {
|
|
chDescs := []*tmconn.ChannelDescriptor{
|
|
{ID: testCh, Priority: 1},
|
|
}
|
|
reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
|
|
msgTypeByChID := map[byte]proto.Message{
|
|
testCh: &p2p.Message{},
|
|
}
|
|
pk := ed25519.GenPrivKey()
|
|
pc, err := testOutboundPeerConn(addr, config, false, pk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
timeout := 1 * time.Second
|
|
ourNodeInfo := testNodeInfo(addr.ID, "host_peer")
|
|
peerNodeInfo, err := handshake(pc.conn, timeout, ourNodeInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {}, newMetricsLabelCache())
|
|
p.SetLogger(log.TestingLogger().With("peer", addr))
|
|
return p, nil
|
|
}
|
|
|
|
func testDial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) {
|
|
if cfg.TestDialFail {
|
|
return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)")
|
|
}
|
|
|
|
conn, err := addr.DialTimeout(cfg.DialTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return conn, nil
|
|
}
|
|
|
|
func testOutboundPeerConn(
|
|
addr *NetAddress,
|
|
config *config.P2PConfig,
|
|
persistent bool,
|
|
ourNodePrivKey crypto.PrivKey,
|
|
) (peerConn, error) {
|
|
|
|
var pc peerConn
|
|
conn, err := testDial(addr, config)
|
|
if err != nil {
|
|
return pc, fmt.Errorf("error creating peer: %w", err)
|
|
}
|
|
|
|
pc, err = testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr)
|
|
if err != nil {
|
|
if cerr := conn.Close(); cerr != nil {
|
|
return pc, fmt.Errorf("%v: %w", cerr.Error(), err)
|
|
}
|
|
return pc, err
|
|
}
|
|
|
|
// ensure dialed ID matches connection ID
|
|
if addr.ID != pc.ID() {
|
|
if cerr := conn.Close(); cerr != nil {
|
|
return pc, fmt.Errorf("%v: %w", cerr.Error(), err)
|
|
}
|
|
return pc, ErrSwitchAuthenticationFailure{addr, pc.ID()}
|
|
}
|
|
|
|
return pc, nil
|
|
}
|
|
|
|
type remotePeer struct {
|
|
PrivKey crypto.PrivKey
|
|
Config *config.P2PConfig
|
|
addr *NetAddress
|
|
channels bytes.HexBytes
|
|
listenAddr string
|
|
listener net.Listener
|
|
}
|
|
|
|
func (rp *remotePeer) Addr() *NetAddress {
|
|
return rp.addr
|
|
}
|
|
|
|
func (rp *remotePeer) ID() ID {
|
|
return PubKeyToID(rp.PrivKey.PubKey())
|
|
}
|
|
|
|
func (rp *remotePeer) Start() {
|
|
if rp.listenAddr == "" {
|
|
rp.listenAddr = "127.0.0.1:0"
|
|
}
|
|
|
|
l, e := net.Listen("tcp", rp.listenAddr) // any available address
|
|
if e != nil {
|
|
golog.Fatalf("net.Listen tcp :0: %+v", e)
|
|
}
|
|
rp.listener = l
|
|
rp.addr = NewNetAddress(PubKeyToID(rp.PrivKey.PubKey()), l.Addr())
|
|
if rp.channels == nil {
|
|
rp.channels = []byte{testCh}
|
|
}
|
|
go rp.accept()
|
|
}
|
|
|
|
func (rp *remotePeer) Stop() {
|
|
rp.listener.Close()
|
|
}
|
|
|
|
func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) {
|
|
conn, err := addr.DialTimeout(1 * time.Second)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = handshake(pc.conn, time.Second, rp.nodeInfo())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return conn, err
|
|
}
|
|
|
|
func (rp *remotePeer) accept() {
|
|
conns := []net.Conn{}
|
|
|
|
for {
|
|
conn, err := rp.listener.Accept()
|
|
if err != nil {
|
|
golog.Printf("Failed to accept conn: %+v", err)
|
|
for _, conn := range conns {
|
|
_ = conn.Close()
|
|
}
|
|
return
|
|
}
|
|
|
|
pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey)
|
|
if err != nil {
|
|
golog.Fatalf("Failed to create a peer: %+v", err)
|
|
}
|
|
|
|
_, err = handshake(pc.conn, time.Second, rp.nodeInfo())
|
|
if err != nil {
|
|
golog.Fatalf("Failed to perform handshake: %+v", err)
|
|
}
|
|
|
|
conns = append(conns, conn)
|
|
}
|
|
}
|
|
|
|
func (rp *remotePeer) nodeInfo() NodeInfo {
|
|
return DefaultNodeInfo{
|
|
ProtocolVersion: defaultProtocolVersion,
|
|
DefaultNodeID: rp.Addr().ID,
|
|
ListenAddr: rp.listener.Addr().String(),
|
|
Network: "testing",
|
|
Version: "1.2.3-rc0-deadbeef",
|
|
Channels: rp.channels,
|
|
Moniker: "remote_peer",
|
|
}
|
|
}
|