diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 583bb6766..af79fe563 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -194,7 +194,7 @@ func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID p2p.NodeID) { return } - r.blockchainCh.Out() <- p2p.Envelope{ + r.blockchainCh.Out <- p2p.Envelope{ To: peerID, Message: &bcproto.BlockResponse{Block: blockProto}, } @@ -203,7 +203,7 @@ func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID p2p.NodeID) { } r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height) - r.blockchainCh.Out() <- p2p.Envelope{ + r.blockchainCh.Out <- p2p.Envelope{ To: peerID, Message: &bcproto.NoBlockResponse{Height: msg.Height}, } @@ -229,7 +229,7 @@ func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error { r.pool.AddBlock(envelope.From, block, block.Size()) case *bcproto.StatusRequest: - r.blockchainCh.Out() <- p2p.Envelope{ + r.blockchainCh.Out <- p2p.Envelope{ To: envelope.From, Message: &bcproto.StatusResponse{ Height: r.store.Height(), @@ -284,10 +284,10 @@ func (r *Reactor) processBlockchainCh() { for { select { - case envelope := <-r.blockchainCh.In(): - if err := r.handleMessage(r.blockchainCh.ID(), envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID(), "envelope", envelope, "err", err) - r.blockchainCh.Error() <- p2p.PeerError{ + case envelope := <-r.blockchainCh.In: + if err := r.handleMessage(r.blockchainCh.ID, envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID, "envelope", envelope, "err", err) + r.blockchainCh.Error <- p2p.PeerError{ NodeID: envelope.From, Err: err, } @@ -312,7 +312,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: // send a status update the newly added peer - r.blockchainCh.Out() <- p2p.Envelope{ + r.blockchainCh.Out <- p2p.Envelope{ To: peerUpdate.NodeID, Message: &bcproto.StatusResponse{ Base: r.store.Base(), @@ -376,13 +376,13 @@ func (r *Reactor) requestRoutine() { return case request := <-r.requestsCh: - r.blockchainCh.Out() <- p2p.Envelope{ + r.blockchainCh.Out <- p2p.Envelope{ To: request.PeerID, Message: &bcproto.BlockRequest{Height: request.Height}, } case pErr := <-r.errorsCh: - r.blockchainCh.Error() <- p2p.PeerError{ + r.blockchainCh.Error <- p2p.PeerError{ NodeID: pErr.peerID, Err: pErr.err, } @@ -393,7 +393,7 @@ func (r *Reactor) requestRoutine() { go func() { defer r.poolWG.Done() - r.blockchainCh.Out() <- p2p.Envelope{ + r.blockchainCh.Out <- p2p.Envelope{ Broadcast: true, Message: &bcproto.StatusRequest{}, } @@ -522,14 +522,14 @@ FOR_LOOP: // NOTE: We've already removed the peer's request, but we still need // to clean up the rest. peerID := r.pool.RedoRequest(first.Height) - r.blockchainCh.Error() <- p2p.PeerError{ + r.blockchainCh.Error <- p2p.PeerError{ NodeID: peerID, Err: err, } peerID2 := r.pool.RedoRequest(second.Height) if peerID2 != peerID { - r.blockchainCh.Error() <- p2p.PeerError{ + r.blockchainCh.Error <- p2p.PeerError{ NodeID: peerID2, Err: err, } diff --git a/evidence/reactor.go b/evidence/reactor.go index cfa9b8989..aa3db318e 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -192,10 +192,10 @@ func (r *Reactor) processEvidenceCh() { for { select { - case envelope := <-r.evidenceCh.In(): - if err := r.handleMessage(r.evidenceCh.ID(), envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.evidenceCh.ID(), "envelope", envelope, "err", err) - r.evidenceCh.Error() <- p2p.PeerError{ + case envelope := <-r.evidenceCh.In: + if err := r.handleMessage(r.evidenceCh.ID, envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err) + r.evidenceCh.Error <- p2p.PeerError{ NodeID: envelope.From, Err: err, } @@ -337,7 +337,7 @@ func (r *Reactor) broadcastEvidenceLoop(peerID p2p.NodeID, closer *tmsync.Closer // and thus would not be able to process the evidence correctly. Also, the // peer may receive this piece of evidence multiple times if it added and // removed frequently from the broadcasting peer. - r.evidenceCh.Out() <- p2p.Envelope{ + r.evidenceCh.Out <- p2p.Envelope{ To: peerID, Message: &tmproto.EvidenceList{ Evidence: []tmproto.Evidence{*evProto}, diff --git a/mempool/reactor.go b/mempool/reactor.go index 8b2a1c063..2c8b1c2cd 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -221,10 +221,10 @@ func (r *Reactor) processMempoolCh() { for { select { - case envelope := <-r.mempoolCh.In(): - if err := r.handleMessage(r.mempoolCh.ID(), envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.mempoolCh.ID(), "envelope", envelope, "err", err) - r.mempoolCh.Error() <- p2p.PeerError{ + case envelope := <-r.mempoolCh.In: + if err := r.handleMessage(r.mempoolCh.ID, envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err) + r.mempoolCh.Error <- p2p.PeerError{ NodeID: envelope.From, Err: err, } @@ -370,7 +370,7 @@ func (r *Reactor) broadcastTxRoutine(peerID p2p.NodeID, closer *tmsync.Closer) { if _, ok := memTx.senders.Load(peerMempoolID); !ok { // Send the mempool tx to the corresponding peer. Note, the peer may be // behind and thus would not be able to process the mempool tx correctly. - r.mempoolCh.Out() <- p2p.Envelope{ + r.mempoolCh.Out <- p2p.Envelope{ To: peerID, Message: &protomem.Txs{ Txs: [][]byte{memTx.tx}, diff --git a/node/node.go b/node/node.go index 1086d40be..c82877422 100644 --- a/node/node.go +++ b/node/node.go @@ -758,7 +758,7 @@ func NewNode(config *cfg.Config, // TODO: Fetch and provide real options and do proper p2p bootstrapping. // TODO: Use a persistent peer database. - peerMgr, err := p2p.NewPeerManager(dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerMgr, err := p2p.NewPeerManager(nodeKey.ID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) if err != nil { return nil, err } diff --git a/p2p/channel.go b/p2p/channel.go deleted file mode 100644 index c13cd75d1..000000000 --- a/p2p/channel.go +++ /dev/null @@ -1,145 +0,0 @@ -package p2p - -import ( - "sync" - - "github.com/gogo/protobuf/proto" -) - -// ChannelID is an arbitrary channel ID. -type ChannelID uint16 - -// Envelope specifies the message receiver and sender. -type Envelope struct { - From NodeID // Message sender, or empty for outbound messages. - To NodeID // Message receiver, or empty for inbound messages. - Broadcast bool // Send message to all connected peers, ignoring To. - Message proto.Message // Payload. - - // For internal use in the Router. - channelID ChannelID -} - -// Strip strips internal information from the envelope. Primarily used for -// testing, such that returned envelopes can be compared with literals. -func (e Envelope) Strip() Envelope { - e.channelID = 0 - return e -} - -// PeerError is a peer error reported via the Error channel. -// -// FIXME: This currently just disconnects the peer, which is too simplistic. -// For example, some errors should be logged, some should cause disconnects, -// and some should ban the peer. -// -// FIXME: This should probably be replaced by a more general PeerBehavior -// concept that can mark good and bad behavior and contributes to peer scoring. -// It should possibly also allow reactors to request explicit actions, e.g. -// disconnection or banning, in addition to doing this based on aggregates. -type PeerError struct { - NodeID NodeID - Err error -} - -// Channel is a bidirectional channel for Protobuf message exchange with peers. -// A Channel is safe for concurrent use by multiple goroutines. -type Channel struct { - closeOnce sync.Once - - // id defines the unique channel ID. - id ChannelID - - // messageType specifies the type of messages exchanged via the channel, and - // is used e.g. for automatic unmarshaling. - messageType proto.Message - - // inCh is a channel for receiving inbound messages. Envelope.From is always - // set. - inCh chan Envelope - - // outCh is a channel for sending outbound messages. Envelope.To or Broadcast - // must be set, otherwise the message is discarded. - outCh chan Envelope - - // errCh is a channel for reporting peer errors to the router, typically used - // when peers send an invalid or malignant message. - errCh chan PeerError - - // doneCh is used to signal that a Channel is closed. A Channel is bi-directional - // and should be closed by the reactor, where as the router is responsible - // for explicitly closing the internal In channel. - doneCh chan struct{} -} - -// NewChannel returns a reference to a new p2p Channel. It is the reactor's -// responsibility to close the Channel. After a channel is closed, the router may -// safely and explicitly close the internal In channel. -func NewChannel(id ChannelID, mType proto.Message, in, out chan Envelope, errCh chan PeerError) *Channel { - return &Channel{ - id: id, - messageType: mType, - inCh: in, - outCh: out, - errCh: errCh, - doneCh: make(chan struct{}), - } -} - -// ID returns the Channel's ID. -func (c *Channel) ID() ChannelID { - return c.id -} - -// In returns a read-only inbound go channel. This go channel should be used by -// reactors to consume Envelopes sent from peers. -func (c *Channel) In() <-chan Envelope { - return c.inCh -} - -// Out returns a write-only outbound go channel. This go channel should be used -// by reactors to route Envelopes to other peers. -func (c *Channel) Out() chan<- Envelope { - return c.outCh -} - -// Error returns a write-only outbound go channel designated for peer errors only. -// This go channel should be used by reactors to send peer errors when consuming -// Envelopes sent from other peers. -func (c *Channel) Error() chan<- PeerError { - return c.errCh -} - -// Close closes the outbound channel and marks the Channel as done. Internally, -// the outbound outCh and peer error errCh channels are closed. It is the reactor's -// responsibility to invoke Close. Any send on the Out or Error channel will -// panic after the Channel is closed. -// -// NOTE: After a Channel is closed, the router may safely assume it can no longer -// send on the internal inCh, however it should NEVER explicitly close it as -// that could result in panics by sending on a closed channel. -func (c *Channel) Close() { - c.closeOnce.Do(func() { - close(c.doneCh) - close(c.outCh) - close(c.errCh) - }) -} - -// Done returns the Channel's internal channel that should be used by a router -// to signal when it is safe to send on the internal inCh go channel. -func (c *Channel) Done() <-chan struct{} { - return c.doneCh -} - -// Wrapper is a Protobuf message that can contain a variety of inner messages. -// If a Channel's message type implements Wrapper, the channel will -// automatically (un)wrap passed messages using the container type, such that -// the channel can transparently support multiple message types. -type Wrapper interface { - // Wrap will take a message and wrap it in this one. - Wrap(proto.Message) error - - // Unwrap will unwrap the inner message contained in this message. - Unwrap() (proto.Message, error) -} diff --git a/p2p/mocks/connection.go b/p2p/mocks/connection.go new file mode 100644 index 000000000..07c4540ed --- /dev/null +++ b/p2p/mocks/connection.go @@ -0,0 +1,206 @@ +// Code generated by mockery v2.5.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + conn "github.com/tendermint/tendermint/p2p/conn" + + crypto "github.com/tendermint/tendermint/crypto" + + mock "github.com/stretchr/testify/mock" + + p2p "github.com/tendermint/tendermint/p2p" +) + +// Connection is an autogenerated mock type for the Connection type +type Connection struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Connection) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// FlushClose provides a mock function with given fields: +func (_m *Connection) FlushClose() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Handshake provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Connection) Handshake(_a0 context.Context, _a1 p2p.NodeInfo, _a2 crypto.PrivKey) (p2p.NodeInfo, crypto.PubKey, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 p2p.NodeInfo + if rf, ok := ret.Get(0).(func(context.Context, p2p.NodeInfo, crypto.PrivKey) p2p.NodeInfo); ok { + r0 = rf(_a0, _a1, _a2) + } else { + r0 = ret.Get(0).(p2p.NodeInfo) + } + + var r1 crypto.PubKey + if rf, ok := ret.Get(1).(func(context.Context, p2p.NodeInfo, crypto.PrivKey) crypto.PubKey); ok { + r1 = rf(_a0, _a1, _a2) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(crypto.PubKey) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, p2p.NodeInfo, crypto.PrivKey) error); ok { + r2 = rf(_a0, _a1, _a2) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// LocalEndpoint provides a mock function with given fields: +func (_m *Connection) LocalEndpoint() p2p.Endpoint { + ret := _m.Called() + + var r0 p2p.Endpoint + if rf, ok := ret.Get(0).(func() p2p.Endpoint); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(p2p.Endpoint) + } + + return r0 +} + +// ReceiveMessage provides a mock function with given fields: +func (_m *Connection) ReceiveMessage() (p2p.ChannelID, []byte, error) { + ret := _m.Called() + + var r0 p2p.ChannelID + if rf, ok := ret.Get(0).(func() p2p.ChannelID); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(p2p.ChannelID) + } + + var r1 []byte + if rf, ok := ret.Get(1).(func() []byte); ok { + r1 = rf() + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]byte) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func() error); ok { + r2 = rf() + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// RemoteEndpoint provides a mock function with given fields: +func (_m *Connection) RemoteEndpoint() p2p.Endpoint { + ret := _m.Called() + + var r0 p2p.Endpoint + if rf, ok := ret.Get(0).(func() p2p.Endpoint); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(p2p.Endpoint) + } + + return r0 +} + +// SendMessage provides a mock function with given fields: _a0, _a1 +func (_m *Connection) SendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error) { + ret := _m.Called(_a0, _a1) + + var r0 bool + if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) bool); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 error + if rf, ok := ret.Get(1).(func(p2p.ChannelID, []byte) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Status provides a mock function with given fields: +func (_m *Connection) Status() conn.ConnectionStatus { + ret := _m.Called() + + var r0 conn.ConnectionStatus + if rf, ok := ret.Get(0).(func() conn.ConnectionStatus); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(conn.ConnectionStatus) + } + + return r0 +} + +// String provides a mock function with given fields: +func (_m *Connection) String() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// TrySendMessage provides a mock function with given fields: _a0, _a1 +func (_m *Connection) TrySendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error) { + ret := _m.Called(_a0, _a1) + + var r0 bool + if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) bool); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 error + if rf, ok := ret.Get(1).(func(p2p.ChannelID, []byte) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/p2p/mocks/transport.go b/p2p/mocks/transport.go new file mode 100644 index 000000000..980c4a606 --- /dev/null +++ b/p2p/mocks/transport.go @@ -0,0 +1,121 @@ +// Code generated by mockery v2.5.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + p2p "github.com/tendermint/tendermint/p2p" +) + +// Transport is an autogenerated mock type for the Transport type +type Transport struct { + mock.Mock +} + +// Accept provides a mock function with given fields: +func (_m *Transport) Accept() (p2p.Connection, error) { + ret := _m.Called() + + var r0 p2p.Connection + if rf, ok := ret.Get(0).(func() p2p.Connection); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(p2p.Connection) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Close provides a mock function with given fields: +func (_m *Transport) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Dial provides a mock function with given fields: _a0, _a1 +func (_m *Transport) Dial(_a0 context.Context, _a1 p2p.Endpoint) (p2p.Connection, error) { + ret := _m.Called(_a0, _a1) + + var r0 p2p.Connection + if rf, ok := ret.Get(0).(func(context.Context, p2p.Endpoint) p2p.Connection); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(p2p.Connection) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, p2p.Endpoint) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Endpoints provides a mock function with given fields: +func (_m *Transport) Endpoints() []p2p.Endpoint { + ret := _m.Called() + + var r0 []p2p.Endpoint + if rf, ok := ret.Get(0).(func() []p2p.Endpoint); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]p2p.Endpoint) + } + } + + return r0 +} + +// Protocols provides a mock function with given fields: +func (_m *Transport) Protocols() []p2p.Protocol { + ret := _m.Called() + + var r0 []p2p.Protocol + if rf, ok := ret.Get(0).(func() []p2p.Protocol); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]p2p.Protocol) + } + } + + return r0 +} + +// String provides a mock function with given fields: +func (_m *Transport) String() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go new file mode 100644 index 000000000..9ab57e614 --- /dev/null +++ b/p2p/p2p_test.go @@ -0,0 +1,34 @@ +package p2p_test + +import ( + "context" + + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/crypto/ed25519" + "github.com/tendermint/tendermint/p2p" +) + +// Common setup for P2P tests. + +var ( + ctx = context.Background() + chID = p2p.ChannelID(1) + + selfKey crypto.PrivKey = ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd}) + selfID = p2p.NodeIDFromPubKey(selfKey.PubKey()) + selfInfo = p2p.NodeInfo{ + NodeID: selfID, + ListenAddr: "0.0.0.0:0", + Network: "test", + Moniker: string(selfID), + } + + peerKey crypto.PrivKey = ed25519.GenPrivKeyFromSecret([]byte{0x84, 0xd7, 0x01, 0xbf, 0x83, 0x20, 0x1c, 0xfe}) + peerID = p2p.NodeIDFromPubKey(peerKey.PubKey()) + peerInfo = p2p.NodeInfo{ + NodeID: peerID, + ListenAddr: "0.0.0.0:0", + Network: "test", + Moniker: string(peerID), + } +) diff --git a/p2p/p2ptest/network.go b/p2p/p2ptest/network.go new file mode 100644 index 000000000..9ae2def8b --- /dev/null +++ b/p2p/p2ptest/network.go @@ -0,0 +1,240 @@ +package p2ptest + +import ( + "math/rand" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" + + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/crypto/ed25519" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" +) + +// Network sets up an in-memory network that can be used for high-level P2P +// testing. It creates an arbitrary number of nodes that are connected to each +// other, and can open channels across all nodes with custom reactors. +type Network struct { + Nodes map[p2p.NodeID]*Node + + logger log.Logger + memoryNetwork *p2p.MemoryNetwork +} + +// MakeNetwork creates a test network with the given number of nodes and +// connects them to each other. +func MakeNetwork(t *testing.T, nodes int) *Network { + logger := log.TestingLogger() + network := &Network{ + Nodes: map[p2p.NodeID]*Node{}, + logger: logger, + memoryNetwork: p2p.NewMemoryNetwork(logger), + } + for i := 0; i < nodes; i++ { + node := MakeNode(t, network) + network.Nodes[node.NodeID] = node + } + + // Set up a list of node addresses to dial, and a peer update subscription + // for each node. + dialQueue := []p2p.NodeAddress{} + subs := map[p2p.NodeID]*p2p.PeerUpdates{} + for _, node := range network.Nodes { + dialQueue = append(dialQueue, node.NodeAddress) + subs[node.NodeID] = node.PeerManager.Subscribe() + defer subs[node.NodeID].Close() + } + + // For each node, dial the nodes that it still doesn't have a connection to + // (either inbound or outbound), and wait for both sides to confirm the + // connection via the subscriptions. + for i, sourceAddress := range dialQueue { + sourceNode := network.Nodes[sourceAddress.NodeID] + sourceSub := subs[sourceAddress.NodeID] + for _, targetAddress := range dialQueue[i+1:] { // nodes 0 { var cancel context.CancelFunc resolveCtx, cancel = context.WithTimeout(resolveCtx, r.options.ResolveTimeout) defer cancel() } + + r.logger.Debug("resolving peer address", "peer", address) endpoints, err := address.Resolve(resolveCtx) - if err != nil { + switch { + case err != nil: return nil, fmt.Errorf("failed to resolve address %q: %w", address, err) + case len(endpoints) == 0: + return nil, fmt.Errorf("address %q did not resolve to any endpoints", address) } for _, endpoint := range endpoints { - transport, ok := r.transports[endpoint.Protocol] + transport, ok := r.protocolTransports[endpoint.Protocol] if !ok { - r.logger.Error("no transport found for endpoint protocol", "endpoint", endpoint) + r.logger.Error("no transport found for protocol", "endpoint", endpoint) continue } @@ -463,17 +561,17 @@ func (r *Router) dialPeer(ctx context.Context, address NodeAddress) (Connection, // Internet can't and needs a different public address. conn, err := transport.Dial(dialCtx, endpoint) if err != nil { - r.logger.Error("failed to dial endpoint", "endpoint", endpoint, "err", err) + r.logger.Error("failed to dial endpoint", "peer", address.NodeID, "endpoint", endpoint, "err", err) } else { - r.logger.Info("connected to peer", "peer", address.NodeID, "endpoint", endpoint) + r.logger.Debug("dialed peer", "peer", address.NodeID, "endpoint", endpoint) return conn, nil } } - return nil, fmt.Errorf("failed to connect to peer via %q", address) + return nil, errors.New("all endpoints failed") } // handshakePeer handshakes with a peer, validating the peer's information. If -// expectID is given, we check that the peer's public key matches it. +// expectID is given, we check that the peer's info matches it. func (r *Router) handshakePeer(ctx context.Context, conn Connection, expectID NodeID) (NodeInfo, crypto.PubKey, error) { if r.options.HandshakeTimeout > 0 { var cancel context.CancelFunc @@ -484,51 +582,47 @@ func (r *Router) handshakePeer(ctx context.Context, conn Connection, expectID No if err != nil { return peerInfo, peerKey, err } + if err = peerInfo.Validate(); err != nil { return peerInfo, peerKey, fmt.Errorf("invalid handshake NodeInfo: %w", err) } - if expectID != "" && expectID != peerInfo.NodeID { - return peerInfo, peerKey, fmt.Errorf("expected to connect with peer %q, got %q", - expectID, peerInfo.NodeID) - } if NodeIDFromPubKey(peerKey) != peerInfo.NodeID { return peerInfo, peerKey, fmt.Errorf("peer's public key did not match its node ID %q (expected %q)", peerInfo.NodeID, NodeIDFromPubKey(peerKey)) } - if peerInfo.NodeID == r.nodeInfo.NodeID { - return peerInfo, peerKey, errors.New("rejecting handshake with self") + if expectID != "" && expectID != peerInfo.NodeID { + return peerInfo, peerKey, fmt.Errorf("expected to connect with peer %q, got %q", + expectID, peerInfo.NodeID) } return peerInfo, peerKey, nil } -// routePeer routes inbound messages from a peer to channels, and also sends -// outbound queued messages to the peer. It will close the connection and send -// queue, using this as a signal to coordinate the internal receivePeer() and -// sendPeer() goroutines. It blocks until the peer is done, e.g. when the -// connection or queue is closed. +// routePeer routes inbound and outbound messages between a peer and the reactor +// channels. It will close the given connection and send queue when done, or if +// they are closed elsewhere it will cause this method to shut down and return. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) { - r.logger.Info("routing peer", "peer", peerID) - resultsCh := make(chan error, 2) + r.logger.Info("peer connected", "peer", peerID, "endpoint", conn) + errCh := make(chan error, 2) go func() { - resultsCh <- r.receivePeer(peerID, conn) + errCh <- r.receivePeer(peerID, conn) }() go func() { - resultsCh <- r.sendPeer(peerID, conn, sendQueue) + errCh <- r.sendPeer(peerID, conn, sendQueue) }() - err := <-resultsCh + err := <-errCh _ = conn.Close() sendQueue.close() - if e := <-resultsCh; err == nil { - // The first err was nil, so we update it with the second result, - // which may or may not be nil. + if e := <-errCh; err == nil { + // The first err was nil, so we update it with the second err, which may + // or may not be nil. err = e } switch err { - case nil, io.EOF, ErrTransportClosed{}: - r.logger.Info("peer disconnected", "peer", peerID) + case nil, io.EOF: + r.logger.Info("peer disconnected", "peer", peerID, "endpoint", conn) default: - r.logger.Error("peer failure", "peer", peerID, "err", err) + r.logger.Error("peer failure", "peer", peerID, "endpoint", conn, "err", err) } } @@ -546,7 +640,7 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error { messageType := r.channelMessages[chID] r.channelMtx.RUnlock() if !ok { - r.logger.Error("dropping message for unknown channel", "peer", peerID, "channel", chID) + r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID) continue } @@ -564,10 +658,10 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error { } select { - case queue.enqueue() <- Envelope{channelID: chID, From: peerID, Message: msg}: + case queue.enqueue() <- Envelope{From: peerID, Message: msg}: r.logger.Debug("received message", "peer", peerID, "message", msg) case <-queue.closed(): - r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID) + r.logger.Debug("channel closed, dropping message", "peer", peerID, "channel", chID) case <-r.stopCh: return nil } @@ -579,6 +673,10 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error { for { select { case envelope := <-queue.dequeue(): + if envelope.Message == nil { + r.logger.Error("dropping nil message", "peer", peerID) + continue + } bz, err := proto.Marshal(envelope.Message) if err != nil { r.logger.Error("failed to marshal message", "peer", peerID, "err", err) @@ -602,43 +700,57 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error { // evictPeers evicts connected peers as requested by the peer manager. func (r *Router) evictPeers() { + r.logger.Debug("starting evict routine") ctx := r.stopCtx() for { peerID, err := r.peerManager.EvictNext(ctx) - switch err { - case nil: - case context.Canceled: + switch { + case errors.Is(err, context.Canceled): r.logger.Debug("stopping evict routine") return - default: + case err != nil: r.logger.Error("failed to find next peer to evict", "err", err) return } r.logger.Info("evicting peer", "peer", peerID) r.peerMtx.RLock() - if queue, ok := r.peerQueues[peerID]; ok { + queue, ok := r.peerQueues[peerID] + r.peerMtx.RUnlock() + if ok { queue.close() } - r.peerMtx.RUnlock() } } // OnStart implements service.Service. func (r *Router) OnStart() error { go r.dialPeers() + go r.evictPeers() for _, transport := range r.transports { go r.acceptPeers(transport) } - go r.evictPeers() return nil } // OnStop implements service.Service. // -// FIXME: This needs to close transports as well. +// All channels must be closed by OpenChannel() callers before stopping the +// router, to prevent blocked channel sends in reactors. Channels are not closed +// here, since that would cause any reactor senders to panic, so it is the +// sender's responsibility. func (r *Router) OnStop() { - // Collect all active queues, so we can wait for them to close. + // Signal router shutdown. + close(r.stopCh) + + // Close transport listeners (unblocks Accept calls). + for _, transport := range r.transports { + if err := transport.Close(); err != nil { + r.logger.Error("failed to close transport", "transport", transport, "err", err) + } + } + + // Collect all remaining queues, and wait for them to close. queues := []queue{} r.channelMtx.RLock() for _, q := range r.channelQueues { @@ -650,16 +762,12 @@ func (r *Router) OnStop() { queues = append(queues, q) } r.peerMtx.RUnlock() - - // Signal router shutdown, and wait for queues (and thus goroutines) - // to complete. - close(r.stopCh) for _, q := range queues { <-q.closed() } } -// stopCtx returns a context that is cancelled when the router stops. +// stopCtx returns a new context that is cancelled when the router stops. func (r *Router) stopCtx() context.Context { ctx, cancel := context.WithCancel(context.Background()) go func() { diff --git a/p2p/router_test.go b/p2p/router_test.go index 01ad949d0..6d1c59bc2 100644 --- a/p2p/router_test.go +++ b/p2p/router_test.go @@ -2,158 +2,644 @@ package p2p_test import ( "errors" + "fmt" + "io" + "strings" + "sync" "testing" + "time" "github.com/fortytw2/leaktest" + "github.com/gogo/protobuf/proto" gogotypes "github.com/gogo/protobuf/types" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/libs/log" + tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/mocks" + "github.com/tendermint/tendermint/p2p/p2ptest" ) -type TestMessage = gogotypes.StringValue - -func generateNode() (p2p.NodeInfo, crypto.PrivKey) { - privKey := ed25519.GenPrivKey() - nodeID := p2p.NodeIDFromPubKey(privKey.PubKey()) - nodeInfo := p2p.NodeInfo{ - NodeID: nodeID, - // FIXME: We have to fake a ListenAddr for now. - ListenAddr: "127.0.0.1:1234", - Moniker: "foo", - } - return nodeInfo, privKey -} - func echoReactor(channel *p2p.Channel) { for { select { - case envelope := <-channel.In(): - channel.Out() <- p2p.Envelope{ + case envelope := <-channel.In: + value := envelope.Message.(*p2ptest.Message).Value + channel.Out <- p2p.Envelope{ To: envelope.From, - Message: &TestMessage{Value: envelope.Message.(*TestMessage).Value}, + Message: &p2ptest.Message{Value: value}, } + case <-channel.Done(): return } } } -func TestRouter(t *testing.T) { - defer leaktest.Check(t)() +func TestRouter_Network(t *testing.T) { + t.Cleanup(leaktest.Check(t)) - logger := log.TestingLogger() - network := p2p.NewMemoryNetwork(logger) - nodeInfo, privKey := generateNode() - transport := network.CreateTransport(nodeInfo.NodeID) - defer transport.Close() - chID := p2p.ChannelID(1) + // Create a test network and open a channel where all peers run echoReactor. + network := p2ptest.MakeNetwork(t, 8) + local := network.RandomNode() + peers := network.Peers(local.NodeID) + channels := network.MakeChannels(t, 1, &p2ptest.Message{}) - // Start some other in-memory network nodes to communicate with, running - // a simple echo reactor that returns received messages. - peers := []p2p.NodeAddress{} - for i := 0; i < 3; i++ { - peerManager, err := p2p.NewPeerManager(dbm.NewMemDB(), p2p.PeerManagerOptions{}) - require.NoError(t, err) - peerInfo, peerKey := generateNode() - peerTransport := network.CreateTransport(peerInfo.NodeID) - defer peerTransport.Close() - peerRouter, err := p2p.NewRouter( - logger.With("peerID", i), - peerInfo, - peerKey, - peerManager, - []p2p.Transport{peerTransport}, - p2p.RouterOptions{}, + channel := channels[local.NodeID] + for _, peer := range peers { + go echoReactor(channels[peer.NodeID]) + } + + // Sending a message to each peer should work. + for _, peer := range peers { + p2ptest.RequireSendReceive(t, channel, peer.NodeID, + &p2ptest.Message{Value: "foo"}, + &p2ptest.Message{Value: "foo"}, ) - require.NoError(t, err) - peers = append(peers, peerTransport.Endpoints()[0].NodeAddress(peerInfo.NodeID)) - - channel, err := peerRouter.OpenChannel(chID, &TestMessage{}) - require.NoError(t, err) - defer channel.Close() - go echoReactor(channel) - - err = peerRouter.Start() - require.NoError(t, err) - defer func() { require.NoError(t, peerRouter.Stop()) }() } - // Start the main router and connect it to the peers above. - peerManager, err := p2p.NewPeerManager(dbm.NewMemDB(), p2p.PeerManagerOptions{}) - require.NoError(t, err) - defer peerManager.Close() - for _, address := range peers { - err := peerManager.Add(address) - require.NoError(t, err) - } - peerUpdates := peerManager.Subscribe() - defer peerUpdates.Close() - - router, err := p2p.NewRouter(logger, nodeInfo, privKey, peerManager, []p2p.Transport{transport}, p2p.RouterOptions{}) - require.NoError(t, err) - channel, err := router.OpenChannel(chID, &TestMessage{}) - require.NoError(t, err) - defer channel.Close() - - err = router.Start() - require.NoError(t, err) - defer func() { - // Since earlier defers are closed after this, and we have to make sure - // we close channels and subscriptions before the router, we explicitly - // close them here to. - peerUpdates.Close() - channel.Close() - require.NoError(t, router.Stop()) - }() - - // Wait for peers to come online, and ping them as they do. - for i := 0; i < len(peers); i++ { - peerUpdate := <-peerUpdates.Updates() - peerID := peerUpdate.NodeID - require.Equal(t, p2p.PeerUpdate{ - NodeID: peerID, - Status: p2p.PeerStatusUp, - }, peerUpdate) - - channel.Out() <- p2p.Envelope{To: peerID, Message: &TestMessage{Value: "hi!"}} - assert.Equal(t, p2p.Envelope{ - From: peerID, - Message: &TestMessage{Value: "hi!"}, - }, (<-channel.In()).Strip()) - } - - // We now send a broadcast, which we should return back from all peers. - channel.Out() <- p2p.Envelope{ + // Sending a broadcast should return back a message from all peers. + p2ptest.RequireSend(t, channel, p2p.Envelope{ Broadcast: true, - Message: &TestMessage{Value: "broadcast"}, - } - for i := 0; i < len(peers); i++ { - envelope := <-channel.In() - require.Equal(t, &TestMessage{Value: "broadcast"}, envelope.Message) + Message: &p2ptest.Message{Value: "bar"}, + }) + expect := []p2p.Envelope{} + for _, peer := range peers { + expect = append(expect, p2p.Envelope{ + From: peer.NodeID, + Message: &p2ptest.Message{Value: "bar"}, + }) } + p2ptest.RequireReceiveUnordered(t, channel, expect) - // We then submit an error for a peer, and watch it get disconnected. - channel.Error() <- p2p.PeerError{ + // We then submit an error for a peer, and watch it get disconnected and + // then reconnected as the router retries it. + peerUpdates := local.MakePeerUpdates(t) + channel.Error <- p2p.PeerError{ NodeID: peers[0].NodeID, - Err: errors.New("test error"), + Err: errors.New("boom"), } - peerUpdate := <-peerUpdates.Updates() - require.Equal(t, p2p.PeerUpdate{ - NodeID: peers[0].NodeID, - Status: p2p.PeerStatusDown, - }, peerUpdate) - - // The peer manager will automatically reconnect the peer, so we wait - // for that to happen. - peerUpdate = <-peerUpdates.Updates() - require.Equal(t, p2p.PeerUpdate{ - NodeID: peers[0].NodeID, - Status: p2p.PeerStatusUp, - }, peerUpdate) + p2ptest.RequireUpdates(t, peerUpdates, []p2p.PeerUpdate{ + {NodeID: peers[0].NodeID, Status: p2p.PeerStatusDown}, + {NodeID: peers[0].NodeID, Status: p2p.PeerStatusUp}, + }) +} + +func TestRouter_Channel(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Set up a router with no transports (so no peers). + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, nil, p2p.RouterOptions{}) + require.NoError(t, err) + + require.NoError(t, router.Start()) + t.Cleanup(func() { + require.NoError(t, router.Stop()) + }) + + // Opening a channel should work. + channel, err := router.OpenChannel(chID, &p2ptest.Message{}) + require.NoError(t, err) + + // Opening the same channel again should fail. + _, err = router.OpenChannel(chID, &p2ptest.Message{}) + require.Error(t, err) + + // Opening a different channel should work. + _, err = router.OpenChannel(2, &p2ptest.Message{}) + require.NoError(t, err) + + // Closing the channel, then opening it again should be fine. + channel.Close() + time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async... + + channel, err = router.OpenChannel(chID, &p2ptest.Message{}) + require.NoError(t, err) + + // We should be able to send on the channel, even though there are no peers. + p2ptest.RequireSend(t, channel, p2p.Envelope{ + To: p2p.NodeID(strings.Repeat("a", 40)), + Message: &p2ptest.Message{Value: "foo"}, + }) + + // A message to ourselves should be dropped. + p2ptest.RequireSend(t, channel, p2p.Envelope{ + To: selfID, + Message: &p2ptest.Message{Value: "self"}, + }) + p2ptest.RequireEmpty(t, channel) +} + +// Channel tests are hairy to mock, so we use an in-memory network instead. +func TestRouter_Channel_SendReceive(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Create a test network and open a channel on all nodes. + network := p2ptest.MakeNetwork(t, 3) + ids := network.NodeIDs() + aID, bID, cID := ids[0], ids[1], ids[2] + channels := network.MakeChannels(t, chID, &p2ptest.Message{}) + a, b, c := channels[aID], channels[bID], channels[cID] + otherChannels := network.MakeChannels(t, 9, &p2ptest.Message{}) + + // Sending a message a->b should work, and not send anything + // further to a, b, or c. + p2ptest.RequireSend(t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "foo"}}) + p2ptest.RequireReceive(t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "foo"}}) + p2ptest.RequireEmpty(t, a, b, c) + + // Sending a nil message a->c should be dropped. + p2ptest.RequireSend(t, a, p2p.Envelope{To: bID, Message: nil}) + p2ptest.RequireEmpty(t, a, b, c) + + // Sending a different message type should be dropped. + p2ptest.RequireSend(t, a, p2p.Envelope{To: bID, Message: &gogotypes.BoolValue{Value: true}}) + p2ptest.RequireEmpty(t, a, b, c) + + // Sending to an unknown peer should be dropped. + p2ptest.RequireSend(t, a, p2p.Envelope{ + To: p2p.NodeID(strings.Repeat("a", 40)), + Message: &p2ptest.Message{Value: "a"}, + }) + p2ptest.RequireEmpty(t, a, b, c) + + // Sending without a recipient should be dropped. + p2ptest.RequireSend(t, a, p2p.Envelope{Message: &p2ptest.Message{Value: "noto"}}) + p2ptest.RequireEmpty(t, a, b, c) + + // Sending to self should be dropped. + p2ptest.RequireSend(t, a, p2p.Envelope{To: aID, Message: &p2ptest.Message{Value: "self"}}) + p2ptest.RequireEmpty(t, a, b, c) + + // Removing b and sending to it should be dropped. + network.Remove(t, bID) + p2ptest.RequireSend(t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "nob"}}) + p2ptest.RequireEmpty(t, a, b, c) + + // After all this, sending a message c->a should work. + p2ptest.RequireSend(t, c, p2p.Envelope{To: aID, Message: &p2ptest.Message{Value: "bar"}}) + p2ptest.RequireReceive(t, a, p2p.Envelope{From: cID, Message: &p2ptest.Message{Value: "bar"}}) + p2ptest.RequireEmpty(t, a, b, c) + + // None of these messages should have made it onto the other channels. + for _, other := range otherChannels { + p2ptest.RequireEmpty(t, other) + } +} + +func TestRouter_Channel_Broadcast(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Create a test network and open a channel on all nodes. + network := p2ptest.MakeNetwork(t, 4) + ids := network.NodeIDs() + aID, bID, cID, dID := ids[0], ids[1], ids[2], ids[3] + channels := network.MakeChannels(t, 1, &p2ptest.Message{}) + a, b, c, d := channels[aID], channels[bID], channels[cID], channels[dID] + + // Sending a broadcast from b should work. + p2ptest.RequireSend(t, b, p2p.Envelope{Broadcast: true, Message: &p2ptest.Message{Value: "foo"}}) + p2ptest.RequireReceive(t, a, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}}) + p2ptest.RequireReceive(t, c, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}}) + p2ptest.RequireReceive(t, d, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}}) + p2ptest.RequireEmpty(t, a, b, c, d) + + // Removing one node from the network shouldn't prevent broadcasts from working. + network.Remove(t, dID) + p2ptest.RequireSend(t, a, p2p.Envelope{Broadcast: true, Message: &p2ptest.Message{Value: "bar"}}) + p2ptest.RequireReceive(t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "bar"}}) + p2ptest.RequireReceive(t, c, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "bar"}}) + p2ptest.RequireEmpty(t, a, b, c, d) +} + +func TestRouter_Channel_Wrapper(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Create a test network and open a channel on all nodes. + network := p2ptest.MakeNetwork(t, 2) + ids := network.NodeIDs() + aID, bID := ids[0], ids[1] + channels := network.MakeChannels(t, 1, &wrapperMessage{}) + a, b := channels[aID], channels[bID] + + // Since wrapperMessage implements p2p.Wrapper and handles Message, it + // should automatically wrap and unwrap sent messages -- we prepend the + // wrapper actions to the message value to signal this. + p2ptest.RequireSend(t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "foo"}}) + p2ptest.RequireReceive(t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "unwrap:wrap:foo"}}) + + // If we send a different message that can't be wrapped, it should be dropped. + p2ptest.RequireSend(t, a, p2p.Envelope{To: bID, Message: &gogotypes.BoolValue{Value: true}}) + p2ptest.RequireEmpty(t, b) + + // If we send the wrapper message itself, it should also be passed through + // since WrapperMessage supports it, and should only be unwrapped at the receiver. + p2ptest.RequireSend(t, a, p2p.Envelope{ + To: bID, + Message: &wrapperMessage{Message: p2ptest.Message{Value: "foo"}}, + }) + p2ptest.RequireReceive(t, b, p2p.Envelope{ + From: aID, + Message: &p2ptest.Message{Value: "unwrap:foo"}, + }) + +} + +// WrapperMessage prepends the value with "wrap:" and "unwrap:" to test it. +type wrapperMessage struct { + p2ptest.Message +} + +var _ p2p.Wrapper = (*wrapperMessage)(nil) + +func (w *wrapperMessage) Wrap(inner proto.Message) error { + switch inner := inner.(type) { + case *p2ptest.Message: + w.Message.Value = fmt.Sprintf("wrap:%v", inner.Value) + case *wrapperMessage: + *w = *inner + default: + return fmt.Errorf("invalid message type %T", inner) + } + return nil +} + +func (w *wrapperMessage) Unwrap() (proto.Message, error) { + return &p2ptest.Message{Value: fmt.Sprintf("unwrap:%v", w.Message.Value)}, nil +} + +func TestRouter_Channel_Error(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Create a test network and open a channel on all nodes. + network := p2ptest.MakeNetwork(t, 3) + ids := network.NodeIDs() + aID, bID := ids[0], ids[1] + channels := network.MakeChannels(t, 1, &p2ptest.Message{}) + a := channels[aID] + + // Erroring b should cause it to be disconnected. It will reconnect shortly after. + sub := network.Nodes[aID].MakePeerUpdates(t) + p2ptest.RequireError(t, a, p2p.PeerError{NodeID: bID, Err: errors.New("boom")}) + p2ptest.RequireUpdates(t, sub, []p2p.PeerUpdate{ + {NodeID: bID, Status: p2p.PeerStatusDown}, + {NodeID: bID, Status: p2p.PeerStatusUp}, + }) +} + +func TestRouter_AcceptPeers(t *testing.T) { + testcases := map[string]struct { + peerInfo p2p.NodeInfo + peerKey crypto.PubKey + ok bool + }{ + "valid handshake": {peerInfo, peerKey.PubKey(), true}, + "empty handshake": {p2p.NodeInfo{}, nil, false}, + "invalid key": {peerInfo, selfKey.PubKey(), false}, + "self handshake": {selfInfo, selfKey.PubKey(), false}, + } + for name, tc := range testcases { + tc := tc + t.Run(name, func(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Set up a mock transport that handshakes. + closer := tmsync.NewCloser() + mockConnection := &mocks.Connection{} + mockConnection.On("String").Maybe().Return("mock") + mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + Return(tc.peerInfo, tc.peerKey, nil) + mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil) + if tc.ok { + mockConnection.On("ReceiveMessage").Return(chID, nil, io.EOF) + } + + mockTransport := &mocks.Transport{} + mockTransport.On("String").Maybe().Return("mock") + mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"}) + mockTransport.On("Close").Return(nil) + mockTransport.On("Accept").Once().Return(mockConnection, nil) + mockTransport.On("Accept").Once().Return(nil, io.EOF) + + // Set up and start the router. + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + sub := peerManager.Subscribe() + defer sub.Close() + + router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, + []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + require.NoError(t, err) + require.NoError(t, router.Start()) + + if tc.ok { + p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{ + NodeID: tc.peerInfo.NodeID, + Status: p2p.PeerStatusUp, + }) + sub.Close() + } else { + select { + case <-closer.Done(): + case <-time.After(100 * time.Millisecond): + require.Fail(t, "connection not closed") + } + } + + require.NoError(t, router.Stop()) + mockTransport.AssertExpectations(t) + mockConnection.AssertExpectations(t) + }) + } +} + +func TestRouter_AcceptPeers_Error(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Set up a mock transport that returns an error, which should prevent + // the router from calling Accept again. + mockTransport := &mocks.Transport{} + mockTransport.On("String").Maybe().Return("mock") + mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"}) + mockTransport.On("Accept").Once().Return(nil, errors.New("boom")) + mockTransport.On("Close").Return(nil) + + // Set up and start the router. + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, + []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + require.NoError(t, err) + + require.NoError(t, router.Start()) + time.Sleep(time.Second) + require.NoError(t, router.Stop()) + + mockTransport.AssertExpectations(t) +} + +func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Set up a mock transport that returns io.EOF once, which should prevent + // the router from calling Accept again. + mockTransport := &mocks.Transport{} + mockTransport.On("String").Maybe().Return("mock") + mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"}) + mockTransport.On("Accept").Once().Return(nil, io.EOF) + mockTransport.On("Close").Return(nil) + + // Set up and start the router. + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, + []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + require.NoError(t, err) + + require.NoError(t, router.Start()) + time.Sleep(time.Second) + require.NoError(t, router.Stop()) + + mockTransport.AssertExpectations(t) +} + +func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Set up a mock transport that returns a connection that blocks during the + // handshake. It should be able to accept several of these in parallel, i.e. + // a single connection can't halt other connections being accepted. + acceptCh := make(chan bool, 3) + closeCh := make(chan time.Time) + + mockConnection := &mocks.Connection{} + mockConnection.On("String").Maybe().Return("mock") + mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + WaitUntil(closeCh).Return(p2p.NodeInfo{}, nil, io.EOF) + mockConnection.On("Close").Return(nil) + + mockTransport := &mocks.Transport{} + mockTransport.On("String").Maybe().Return("mock") + mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"}) + mockTransport.On("Close").Return(nil) + mockTransport.On("Accept").Times(3).Run(func(_ mock.Arguments) { + acceptCh <- true + }).Return(mockConnection, nil) + mockTransport.On("Accept").Once().Return(nil, io.EOF) + + // Set up and start the router. + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, + []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + require.NoError(t, err) + require.NoError(t, router.Start()) + + require.Eventually(t, func() bool { + return len(acceptCh) == 3 + }, time.Second, 10*time.Millisecond) + close(closeCh) + time.Sleep(100 * time.Millisecond) + + require.NoError(t, router.Stop()) + mockTransport.AssertExpectations(t) + mockConnection.AssertExpectations(t) +} + +func TestRouter_DialPeers(t *testing.T) { + testcases := map[string]struct { + dialID p2p.NodeID + peerInfo p2p.NodeInfo + peerKey crypto.PubKey + dialErr error + ok bool + }{ + "valid dial": {peerInfo.NodeID, peerInfo, peerKey.PubKey(), nil, true}, + "empty handshake": {peerInfo.NodeID, p2p.NodeInfo{}, nil, nil, false}, + "invalid key": {peerInfo.NodeID, peerInfo, selfKey.PubKey(), nil, false}, + "unexpected node ID": {peerInfo.NodeID, selfInfo, selfKey.PubKey(), nil, false}, + "dial error": {peerInfo.NodeID, peerInfo, peerKey.PubKey(), errors.New("boom"), false}, + } + for name, tc := range testcases { + tc := tc + t.Run(name, func(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + address := p2p.NodeAddress{Protocol: "mock", NodeID: tc.dialID} + endpoint := p2p.Endpoint{Protocol: "mock", Path: string(tc.dialID)} + + // Set up a mock transport that handshakes. + closer := tmsync.NewCloser() + mockConnection := &mocks.Connection{} + mockConnection.On("String").Maybe().Return("mock") + if tc.dialErr == nil { + mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + Return(tc.peerInfo, tc.peerKey, nil) + mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil) + } + if tc.ok { + mockConnection.On("ReceiveMessage").Return(chID, nil, io.EOF) + } + + mockTransport := &mocks.Transport{} + mockTransport.On("String").Maybe().Return("mock") + mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"}) + mockTransport.On("Close").Return(nil) + mockTransport.On("Accept").Maybe().Return(nil, io.EOF) + if tc.dialErr == nil { + mockTransport.On("Dial", mock.Anything, endpoint).Once().Return(mockConnection, nil) + // This handles the retry when a dialed connection gets closed after ReceiveMessage + // returns io.EOF above. + mockTransport.On("Dial", mock.Anything, endpoint).Maybe().Return(nil, io.EOF) + } else { + mockTransport.On("Dial", mock.Anything, endpoint).Once(). + Run(func(_ mock.Arguments) { closer.Close() }). + Return(nil, tc.dialErr) + } + + // Set up and start the router. + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + require.NoError(t, peerManager.Add(address)) + sub := peerManager.Subscribe() + defer sub.Close() + + router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, + []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + require.NoError(t, err) + require.NoError(t, router.Start()) + + if tc.ok { + p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{ + NodeID: tc.peerInfo.NodeID, + Status: p2p.PeerStatusUp, + }) + sub.Close() + } else { + select { + case <-closer.Done(): + case <-time.After(100 * time.Millisecond): + require.Fail(t, "connection not closed") + } + } + + require.NoError(t, router.Stop()) + mockTransport.AssertExpectations(t) + mockConnection.AssertExpectations(t) + }) + } +} + +func TestRouter_DialPeers_Parallel(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + a := p2p.NodeAddress{Protocol: "mock", NodeID: p2p.NodeID(strings.Repeat("a", 40))} + b := p2p.NodeAddress{Protocol: "mock", NodeID: p2p.NodeID(strings.Repeat("b", 40))} + c := p2p.NodeAddress{Protocol: "mock", NodeID: p2p.NodeID(strings.Repeat("c", 40))} + + // Set up a mock transport that returns a connection that blocks during the + // handshake. It should dial all peers in parallel. + dialCh := make(chan bool, 3) + closeCh := make(chan time.Time) + + mockConnection := &mocks.Connection{} + mockConnection.On("String").Maybe().Return("mock") + mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + WaitUntil(closeCh).Return(p2p.NodeInfo{}, nil, io.EOF) + mockConnection.On("Close").Return(nil) + + mockTransport := &mocks.Transport{} + mockTransport.On("String").Maybe().Return("mock") + mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"}) + mockTransport.On("Close").Return(nil) + mockTransport.On("Accept").Once().Return(nil, io.EOF) + for _, address := range []p2p.NodeAddress{a, b, c} { + endpoint := p2p.Endpoint{Protocol: address.Protocol, Path: string(address.NodeID)} + mockTransport.On("Dial", mock.Anything, endpoint).Run(func(_ mock.Arguments) { + dialCh <- true + }).Return(mockConnection, nil) + } + + // Set up and start the router. + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + require.NoError(t, peerManager.Add(a)) + require.NoError(t, peerManager.Add(b)) + require.NoError(t, peerManager.Add(c)) + + router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, + []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + require.NoError(t, err) + require.NoError(t, router.Start()) + + require.Eventually(t, func() bool { + return len(dialCh) == 3 + }, time.Second, 10*time.Millisecond) + close(closeCh) + time.Sleep(100 * time.Millisecond) + + require.NoError(t, router.Stop()) + mockTransport.AssertExpectations(t) + mockConnection.AssertExpectations(t) +} + +func TestRouter_EvictPeers(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + // Set up a mock transport that we can evict. + closeCh := make(chan time.Time) + closeOnce := sync.Once{} + + mockConnection := &mocks.Connection{} + mockConnection.On("String").Maybe().Return("mock") + mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + Return(peerInfo, peerKey.PubKey(), nil) + mockConnection.On("ReceiveMessage").WaitUntil(closeCh).Return(chID, nil, io.EOF) + mockConnection.On("Close").Run(func(_ mock.Arguments) { + closeOnce.Do(func() { + close(closeCh) + }) + }).Return(nil) + + mockTransport := &mocks.Transport{} + mockTransport.On("String").Maybe().Return("mock") + mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"}) + mockTransport.On("Close").Return(nil) + mockTransport.On("Accept").Once().Return(mockConnection, nil) + mockTransport.On("Accept").Once().Return(nil, io.EOF) + + // Set up and start the router. + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + sub := peerManager.Subscribe() + defer sub.Close() + + router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, + []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + require.NoError(t, err) + require.NoError(t, router.Start()) + + // Wait for the mock peer to connect, then evict it by reporting an error. + p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{ + NodeID: peerInfo.NodeID, + Status: p2p.PeerStatusUp, + }) + + require.NoError(t, peerManager.Errored(peerInfo.NodeID, errors.New("boom"))) + + p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{ + NodeID: peerInfo.NodeID, + Status: p2p.PeerStatusDown, + }) + sub.Close() + + require.NoError(t, router.Stop()) + mockTransport.AssertExpectations(t) + mockConnection.AssertExpectations(t) } diff --git a/p2p/shim.go b/p2p/shim.go index a349849c7..a71a1df54 100644 --- a/p2p/shim.go +++ b/p2p/shim.go @@ -39,6 +39,9 @@ type ( ChannelShim struct { Descriptor *ChannelDescriptor Channel *Channel + inCh chan<- Envelope + outCh <-chan Envelope + errCh <-chan PeerError } // ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel @@ -56,7 +59,7 @@ func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*C for _, cds := range descriptors { chShim := NewChannelShim(cds, 0) - channels[chShim.Channel.id] = chShim + channels[chShim.Channel.ID] = chShim } rs := &ReactorShim{ @@ -72,15 +75,21 @@ func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*C } func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim { + inCh := make(chan Envelope, buf) + outCh := make(chan Envelope, buf) + errCh := make(chan PeerError, buf) return &ChannelShim{ Descriptor: cds.Descriptor, Channel: NewChannel( ChannelID(cds.Descriptor.ID), cds.MsgType, - make(chan Envelope, buf), - make(chan Envelope, buf), - make(chan PeerError, buf), + inCh, + outCh, + errCh, ), + inCh: inCh, + outCh: outCh, + errCh: errCh, } } @@ -91,7 +100,7 @@ func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim { func (rs *ReactorShim) proxyPeerEnvelopes() { for _, cs := range rs.Channels { go func(cs *ChannelShim) { - for e := range cs.Channel.outCh { + for e := range cs.outCh { msg := proto.Clone(cs.Channel.messageType) msg.Reset() @@ -161,7 +170,7 @@ func (rs *ReactorShim) proxyPeerEnvelopes() { func (rs *ReactorShim) handlePeerErrors() { for _, cs := range rs.Channels { go func(cs *ChannelShim) { - for pErr := range cs.Channel.errCh { + for pErr := range cs.errCh { if pErr.NodeID != "" { peer := rs.Switch.peers.Get(pErr.NodeID) if peer == nil { @@ -311,7 +320,7 @@ func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) { } select { - case channelShim.Channel.inCh <- Envelope{From: src.ID(), Message: msg}: + case channelShim.inCh <- Envelope{From: src.ID(), Message: msg}: rs.Logger.Debug("proxied envelope", "reactor", rs.Name, "ch_id", cID, "peer", src.ID()) case <-channelShim.Channel.Done(): diff --git a/p2p/shim_test.go b/p2p/shim_test.go index a12a184cb..b91af366d 100644 --- a/p2p/shim_test.go +++ b/p2p/shim_test.go @@ -92,7 +92,7 @@ func TestReactorShim_GetChannel(t *testing.T) { p2pCh := rts.shim.GetChannel(p2p.ChannelID(channelID1)) require.NotNil(t, p2pCh) - require.Equal(t, p2pCh.ID(), p2p.ChannelID(channelID1)) + require.Equal(t, p2pCh.ID, p2p.ChannelID(channelID1)) p2pCh = rts.shim.GetChannel(p2p.ChannelID(byte(0x03))) require.Nil(t, p2pCh) @@ -178,11 +178,11 @@ func TestReactorShim_Receive(t *testing.T) { // Simulate receiving the envelope in some real reactor and replying back with // the same envelope and then closing the Channel. go func() { - e := <-p2pCh.Channel.In() + e := <-p2pCh.Channel.In require.Equal(t, peerIDA, e.From) require.NotNil(t, e.Message) - p2pCh.Channel.Out() <- p2p.Envelope{To: e.From, Message: e.Message} + p2pCh.Channel.Out <- p2p.Envelope{To: e.From, Message: e.Message} p2pCh.Channel.Close() wg.Done() }() @@ -200,7 +200,7 @@ func TestReactorShim_Receive(t *testing.T) { // Since p2pCh was closed in the simulated reactor above, calling Receive // should not block. rts.shim.Receive(channelID1, peerA, bz) - require.Empty(t, p2pCh.Channel.In()) + require.Empty(t, p2pCh.Channel.In) peerA.AssertExpectations(t) } diff --git a/p2p/transport.go b/p2p/transport.go index dc01e5efc..ff782f966 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -10,6 +10,8 @@ import ( "github.com/tendermint/tendermint/p2p/conn" ) +//go:generate mockery --case underscore --name Transport|Connection + const ( // defaultProtocol is the default protocol used for NodeAddress when // a protocol isn't explicitly given as a URL scheme. diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 56aa3d92b..657cb4b35 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -19,11 +19,8 @@ import ( // transportFactory is used to set up transports for tests. type transportFactory func(t *testing.T) p2p.Transport -var ( - ctx = context.Background() // convenience context - chID = p2p.ChannelID(1) // channel ID for use in tests - testTransports = map[string]transportFactory{} // registry for withTransports -) +// testTransports is a registry of transport factories for withTransports(). +var testTransports = map[string]transportFactory{} // withTransports is a test helper that runs a test against all transports // registered in testTransports. diff --git a/statesync/reactor.go b/statesync/reactor.go index f88e2d1e0..a7e536b0f 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -170,7 +170,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { "height", snapshot.Height, "format", snapshot.Format, ) - r.snapshotCh.Out() <- p2p.Envelope{ + r.snapshotCh.Out <- p2p.Envelope{ To: envelope.From, Message: &ssproto.SnapshotsResponse{ Height: snapshot.Height, @@ -254,7 +254,7 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { "chunk", msg.Index, "peer", envelope.From, ) - r.chunkCh.Out() <- p2p.Envelope{ + r.chunkCh.Out <- p2p.Envelope{ To: envelope.From, Message: &ssproto.ChunkResponse{ Height: msg.Height, @@ -343,10 +343,10 @@ func (r *Reactor) processSnapshotCh() { for { select { - case envelope := <-r.snapshotCh.In(): - if err := r.handleMessage(r.snapshotCh.ID(), envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err) - r.snapshotCh.Error() <- p2p.PeerError{ + case envelope := <-r.snapshotCh.In: + if err := r.handleMessage(r.snapshotCh.ID, envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.snapshotCh.ID, "envelope", envelope, "err", err) + r.snapshotCh.Error <- p2p.PeerError{ NodeID: envelope.From, Err: err, } @@ -369,10 +369,10 @@ func (r *Reactor) processChunkCh() { for { select { - case envelope := <-r.chunkCh.In(): - if err := r.handleMessage(r.chunkCh.ID(), envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err) - r.chunkCh.Error() <- p2p.PeerError{ + case envelope := <-r.chunkCh.In: + if err := r.handleMessage(r.chunkCh.ID, envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.chunkCh.ID, "envelope", envelope, "err", err) + r.chunkCh.Error <- p2p.PeerError{ NodeID: envelope.From, Err: err, } @@ -470,12 +470,12 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) return sm.State{}, nil, errors.New("a state sync is already in progress") } - r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out(), r.chunkCh.Out(), r.tempDir) + r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir) r.mtx.Unlock() // request snapshots from all currently connected peers r.Logger.Debug("requesting snapshots from known peers") - r.snapshotCh.Out() <- p2p.Envelope{ + r.snapshotCh.Out <- p2p.Envelope{ Broadcast: true, Message: &ssproto.SnapshotsRequest{}, } diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 4ad921e9e..d094e880a 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -791,7 +791,7 @@ func NewNode(config *cfg.Config, // TODO: Fetch and provide real options and do proper p2p bootstrapping. // TODO: Use a persistent peer database. - peerMgr, err := p2p.NewPeerManager(dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerMgr, err := p2p.NewPeerManager(nodeKey.ID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) if err != nil { return nil, err }