From 5bf30bb0491f28b0377657df5ead0c88d8384a50 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 6 Oct 2021 15:17:44 -0400 Subject: [PATCH] p2p: cleanup transport interface (#7071) This is another batch of things to cleanup in the legacy P2P system. --- CHANGELOG_PENDING.md | 2 +- .../commands/reset_priv_validator.go | 17 +-- cmd/tendermint/commands/run_node.go | 2 - cmd/tendermint/commands/testnet.go | 1 - config/config.go | 105 ++------------ config/config_test.go | 20 --- config/toml.go | 50 ------- internal/p2p/conn/connection.go | 133 ------------------ internal/p2p/conn/connection_test.go | 32 +---- internal/p2p/conn_set.go | 82 ----------- internal/p2p/mock/peer.go | 9 +- internal/p2p/mocks/connection.go | 69 +-------- internal/p2p/mocks/peer.go | 15 -- internal/p2p/router.go | 3 +- internal/p2p/shim.go | 13 -- internal/p2p/transport.go | 27 +--- internal/p2p/transport_mconn.go | 49 ++----- internal/p2p/transport_mconn_test.go | 1 - internal/p2p/transport_memory.go | 40 +----- internal/p2p/transport_test.go | 51 ++----- internal/p2p/types.go | 1 - node/node.go | 4 - node/setup.go | 22 +-- rpc/client/helpers.go | 2 +- test/e2e/runner/setup.go | 1 - 25 files changed, 52 insertions(+), 699 deletions(-) delete mode 100644 internal/p2p/conn_set.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index d80e3ae61..9d7ea96ae 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -17,7 +17,7 @@ Special thanks to external contributors on this release: - P2P Protocol - [p2p] \#7035 Remove legacy P2P routing implementation and - associated configuration (@tychoish) + associated configuration options (@tychoish) - Go API diff --git a/cmd/tendermint/commands/reset_priv_validator.go b/cmd/tendermint/commands/reset_priv_validator.go index 8745e55d8..5f3e54700 100644 --- a/cmd/tendermint/commands/reset_priv_validator.go +++ b/cmd/tendermint/commands/reset_priv_validator.go @@ -37,7 +37,7 @@ var ResetPrivValidatorCmd = &cobra.Command{ // XXX: this is totally unsafe. // it's only suitable for testnets. func resetAll(cmd *cobra.Command, args []string) error { - return ResetAll(config.DBDir(), config.P2P.AddrBookFile(), config.PrivValidator.KeyFile(), + return ResetAll(config.DBDir(), config.PrivValidator.KeyFile(), config.PrivValidator.StateFile(), logger) } @@ -49,12 +49,7 @@ func resetPrivValidator(cmd *cobra.Command, args []string) error { // ResetAll removes address book files plus all data, and resets the privValdiator data. // Exported so other CLI tools can use it. -func ResetAll(dbDir, addrBookFile, privValKeyFile, privValStateFile string, logger log.Logger) error { - if keepAddrBook { - logger.Info("The address book remains intact") - } else { - removeAddrBook(addrBookFile, logger) - } +func ResetAll(dbDir, privValKeyFile, privValStateFile string, logger log.Logger) error { if err := os.RemoveAll(dbDir); err == nil { logger.Info("Removed all blockchain history", "dir", dbDir) } else { @@ -87,11 +82,3 @@ func resetFilePV(privValKeyFile, privValStateFile string, logger log.Logger) err } return nil } - -func removeAddrBook(addrBookFile string, logger log.Logger) { - if err := os.Remove(addrBookFile); err == nil { - logger.Info("Removed existing address book", "file", addrBookFile) - } else if !os.IsNotExist(err) { - logger.Info("Error removing address book", "file", addrBookFile, "err", err) - } -} diff --git a/cmd/tendermint/commands/run_node.go b/cmd/tendermint/commands/run_node.go index 1e310f5eb..0c6d3271e 100644 --- a/cmd/tendermint/commands/run_node.go +++ b/cmd/tendermint/commands/run_node.go @@ -84,8 +84,6 @@ func AddNodeFlags(cmd *cobra.Command) { "node listen address. (0.0.0.0:0 means any interface, any port)") cmd.Flags().String("p2p.seeds", config.P2P.Seeds, "comma-delimited ID@host:port seed nodes") cmd.Flags().String("p2p.persistent-peers", config.P2P.PersistentPeers, "comma-delimited ID@host:port persistent peers") - cmd.Flags().String("p2p.unconditional-peer-ids", - config.P2P.UnconditionalPeerIDs, "comma-delimited IDs of unconditional peers") cmd.Flags().Bool("p2p.upnp", config.P2P.UPNP, "enable/disable UPNP port forwarding") cmd.Flags().Bool("p2p.pex", config.P2P.PexReactor, "enable/disable Peer-Exchange") cmd.Flags().String("p2p.private-peer-ids", config.P2P.PrivatePeerIDs, "comma-delimited private peer IDs") diff --git a/cmd/tendermint/commands/testnet.go b/cmd/tendermint/commands/testnet.go index a7307b38f..ef46f5428 100644 --- a/cmd/tendermint/commands/testnet.go +++ b/cmd/tendermint/commands/testnet.go @@ -226,7 +226,6 @@ func testnetFiles(cmd *cobra.Command, args []string) error { for i := 0; i < nValidators+nNonValidators; i++ { nodeDir := filepath.Join(outputDir, fmt.Sprintf("%s%d", nodeDirPrefix, i)) config.SetRoot(nodeDir) - config.P2P.AddrBookStrict = false config.P2P.AllowDuplicateIP = true if populatePersistentPeers { persistentPeersWithoutSelf := make([]string, 0) diff --git a/config/config.go b/config/config.go index 349e5a6f7..0e204b6e6 100644 --- a/config/config.go +++ b/config/config.go @@ -53,16 +53,14 @@ var ( defaultPrivValKeyName = "priv_validator_key.json" defaultPrivValStateName = "priv_validator_state.json" - defaultNodeKeyName = "node_key.json" - defaultAddrBookName = "addrbook.json" + defaultNodeKeyName = "node_key.json" defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName) defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName) defaultPrivValKeyPath = filepath.Join(defaultConfigDir, defaultPrivValKeyName) defaultPrivValStatePath = filepath.Join(defaultDataDir, defaultPrivValStateName) - defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName) - defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName) + defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName) ) // Config defines the top level configuration for a Tendermint node @@ -141,9 +139,6 @@ func (cfg *Config) ValidateBasic() error { if err := cfg.RPC.ValidateBasic(); err != nil { return fmt.Errorf("error in [rpc] section: %w", err) } - if err := cfg.P2P.ValidateBasic(); err != nil { - return fmt.Errorf("error in [p2p] section: %w", err) - } if err := cfg.Mempool.ValidateBasic(); err != nil { return fmt.Errorf("error in [mempool] section: %w", err) } @@ -646,25 +641,6 @@ type P2PConfig struct { //nolint: maligned // UPNP port forwarding UPNP bool `mapstructure:"upnp"` - // Path to address book - AddrBook string `mapstructure:"addr-book-file"` - - // Set true for strict address routability rules - // Set false for private or local networks - AddrBookStrict bool `mapstructure:"addr-book-strict"` - - // Maximum number of inbound peers - // - // TODO: Remove once p2p refactor is complete in favor of MaxConnections. - // ref: https://github.com/tendermint/tendermint/issues/5670 - MaxNumInboundPeers int `mapstructure:"max-num-inbound-peers"` - - // Maximum number of outbound peers to connect to, excluding persistent peers. - // - // TODO: Remove once p2p refactor is complete in favor of MaxConnections. - // ref: https://github.com/tendermint/tendermint/issues/5670 - MaxNumOutboundPeers int `mapstructure:"max-num-outbound-peers"` - // MaxConnections defines the maximum number of connected peers (inbound and // outbound). MaxConnections uint16 `mapstructure:"max-connections"` @@ -673,24 +649,6 @@ type P2PConfig struct { //nolint: maligned // attempts per IP address. MaxIncomingConnectionAttempts uint `mapstructure:"max-incoming-connection-attempts"` - // List of node IDs, to which a connection will be (re)established ignoring any existing limits - UnconditionalPeerIDs string `mapstructure:"unconditional-peer-ids"` - - // Maximum pause when redialing a persistent peer (if zero, exponential backoff is used) - PersistentPeersMaxDialPeriod time.Duration `mapstructure:"persistent-peers-max-dial-period"` - - // Time to wait before flushing messages out on the connection - FlushThrottleTimeout time.Duration `mapstructure:"flush-throttle-timeout"` - - // Maximum size of a message packet payload, in bytes - MaxPacketMsgPayloadSize int `mapstructure:"max-packet-msg-payload-size"` - - // Rate at which packets can be sent, in bytes/second - SendRate int64 `mapstructure:"send-rate"` - - // Rate at which packets can be received, in bytes/second - RecvRate int64 `mapstructure:"recv-rate"` - // Set true to enable the peer-exchange reactor PexReactor bool `mapstructure:"pex"` @@ -721,28 +679,14 @@ func DefaultP2PConfig() *P2PConfig { ListenAddress: "tcp://0.0.0.0:26656", ExternalAddress: "", UPNP: false, - AddrBook: defaultAddrBookPath, - AddrBookStrict: true, - MaxNumInboundPeers: 40, - MaxNumOutboundPeers: 10, MaxConnections: 64, MaxIncomingConnectionAttempts: 100, - PersistentPeersMaxDialPeriod: 0 * time.Second, - FlushThrottleTimeout: 100 * time.Millisecond, - // The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes. - // The IP header and the TCP header take up 20 bytes each at least (unless - // optional header fields are used) and thus the max for (non-Jumbo frame) - // Ethernet is 1500 - 20 -20 = 1460 - // Source: https://stackoverflow.com/a/3074427/820520 - MaxPacketMsgPayloadSize: 1400, - SendRate: 5120000, // 5 mB/s - RecvRate: 5120000, // 5 mB/s - PexReactor: true, - AllowDuplicateIP: false, - HandshakeTimeout: 20 * time.Second, - DialTimeout: 3 * time.Second, - TestDialFail: false, - QueueType: "priority", + PexReactor: true, + AllowDuplicateIP: false, + HandshakeTimeout: 20 * time.Second, + DialTimeout: 3 * time.Second, + TestDialFail: false, + QueueType: "priority", } } @@ -750,43 +694,10 @@ func DefaultP2PConfig() *P2PConfig { func TestP2PConfig() *P2PConfig { cfg := DefaultP2PConfig() cfg.ListenAddress = "tcp://127.0.0.1:36656" - cfg.FlushThrottleTimeout = 10 * time.Millisecond cfg.AllowDuplicateIP = true return cfg } -// AddrBookFile returns the full path to the address book -func (cfg *P2PConfig) AddrBookFile() string { - return rootify(cfg.AddrBook, cfg.RootDir) -} - -// ValidateBasic performs basic validation (checking param bounds, etc.) and -// returns an error if any check fails. -func (cfg *P2PConfig) ValidateBasic() error { - if cfg.MaxNumInboundPeers < 0 { - return errors.New("max-num-inbound-peers can't be negative") - } - if cfg.MaxNumOutboundPeers < 0 { - return errors.New("max-num-outbound-peers can't be negative") - } - if cfg.FlushThrottleTimeout < 0 { - return errors.New("flush-throttle-timeout can't be negative") - } - if cfg.PersistentPeersMaxDialPeriod < 0 { - return errors.New("persistent-peers-max-dial-period can't be negative") - } - if cfg.MaxPacketMsgPayloadSize < 0 { - return errors.New("max-packet-msg-payload-size can't be negative") - } - if cfg.SendRate < 0 { - return errors.New("send-rate can't be negative") - } - if cfg.RecvRate < 0 { - return errors.New("recv-rate can't be negative") - } - return nil -} - //----------------------------------------------------------------------------- // MempoolConfig diff --git a/config/config_test.go b/config/config_test.go index 79d627910..db0fb5967 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -82,26 +82,6 @@ func TestRPCConfigValidateBasic(t *testing.T) { } } -func TestP2PConfigValidateBasic(t *testing.T) { - cfg := TestP2PConfig() - assert.NoError(t, cfg.ValidateBasic()) - - fieldsToTest := []string{ - "MaxNumInboundPeers", - "MaxNumOutboundPeers", - "FlushThrottleTimeout", - "MaxPacketMsgPayloadSize", - "SendRate", - "RecvRate", - } - - for _, fieldName := range fieldsToTest { - reflect.ValueOf(cfg).Elem().FieldByName(fieldName).SetInt(-1) - assert.Error(t, cfg.ValidateBasic()) - reflect.ValueOf(cfg).Elem().FieldByName(fieldName).SetInt(0) - } -} - func TestMempoolConfigValidateBasic(t *testing.T) { cfg := TestMempoolConfig() assert.NoError(t, cfg.ValidateBasic()) diff --git a/config/toml.go b/config/toml.go index ee4c30004..1159640fb 100644 --- a/config/toml.go +++ b/config/toml.go @@ -296,62 +296,12 @@ persistent-peers = "{{ .P2P.PersistentPeers }}" # UPNP port forwarding upnp = {{ .P2P.UPNP }} -# Path to address book -# TODO: Remove once p2p refactor is complete in favor of peer store. -addr-book-file = "{{ js .P2P.AddrBook }}" - -# Set true for strict address routability rules -# Set false for private or local networks -addr-book-strict = {{ .P2P.AddrBookStrict }} - -# Maximum number of inbound peers -# -# TODO: Remove once p2p refactor is complete in favor of MaxConnections. -# ref: https://github.com/tendermint/tendermint/issues/5670 -max-num-inbound-peers = {{ .P2P.MaxNumInboundPeers }} - -# Maximum number of outbound peers to connect to, excluding persistent peers -# -# TODO: Remove once p2p refactor is complete in favor of MaxConnections. -# ref: https://github.com/tendermint/tendermint/issues/5670 -max-num-outbound-peers = {{ .P2P.MaxNumOutboundPeers }} - # Maximum number of connections (inbound and outbound). max-connections = {{ .P2P.MaxConnections }} # Rate limits the number of incoming connection attempts per IP address. max-incoming-connection-attempts = {{ .P2P.MaxIncomingConnectionAttempts }} -# List of node IDs, to which a connection will be (re)established ignoring any existing limits -# TODO: Remove once p2p refactor is complete. -# ref: https://github.com/tendermint/tendermint/issues/5670 -unconditional-peer-ids = "{{ .P2P.UnconditionalPeerIDs }}" - -# Maximum pause when redialing a persistent peer (if zero, exponential backoff is used) -# TODO: Remove once p2p refactor is complete -# ref: https:#github.com/tendermint/tendermint/issues/5670 -persistent-peers-max-dial-period = "{{ .P2P.PersistentPeersMaxDialPeriod }}" - -# Time to wait before flushing messages out on the connection -# TODO: Remove once p2p refactor is complete -# ref: https:#github.com/tendermint/tendermint/issues/5670 -flush-throttle-timeout = "{{ .P2P.FlushThrottleTimeout }}" - -# Maximum size of a message packet payload, in bytes -# TODO: Remove once p2p refactor is complete -# ref: https:#github.com/tendermint/tendermint/issues/5670 -max-packet-msg-payload-size = {{ .P2P.MaxPacketMsgPayloadSize }} - -# Rate at which packets can be sent, in bytes/second -# TODO: Remove once p2p refactor is complete -# ref: https:#github.com/tendermint/tendermint/issues/5670 -send-rate = {{ .P2P.SendRate }} - -# Rate at which packets can be received, in bytes/second -# TODO: Remove once p2p refactor is complete -# ref: https:#github.com/tendermint/tendermint/issues/5670 -recv-rate = {{ .P2P.RecvRate }} - # Set true to enable the peer-exchange reactor pex = {{ .P2P.PexReactor }} diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 339ad7469..0ab7b7546 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -64,15 +64,11 @@ initialization of the connection. There are two methods for sending messages: func (m MConnection) Send(chID byte, msgBytes []byte) bool {} - func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {} `Send(chID, msgBytes)` is a blocking call that waits until `msg` is successfully queued for the channel with the given id byte `chID`, or until the request times out. The message `msg` is serialized using Protobuf. -`TrySend(chID, msgBytes)` is a nonblocking call that returns false if the -channel's queue is full. - Inbound message bytes are handled with an onReceive callback function. */ type MConnection struct { @@ -265,43 +261,6 @@ func (c *MConnection) stopServices() (alreadyStopped bool) { return false } -// FlushStop replicates the logic of OnStop. -// It additionally ensures that all successful -// .Send() calls will get flushed before closing -// the connection. -func (c *MConnection) FlushStop() { - if c.stopServices() { - return - } - - // this block is unique to FlushStop - { - // wait until the sendRoutine exits - // so we dont race on calling sendSomePacketMsgs - <-c.doneSendRoutine - - // Send and flush all pending msgs. - // Since sendRoutine has exited, we can call this - // safely - eof := c.sendSomePacketMsgs() - for !eof { - eof = c.sendSomePacketMsgs() - } - c.flush() - - // Now we can close the connection - } - - c.conn.Close() - - // We can't close pong safely here because - // recvRoutine may write to it after we've stopped. - // Though it doesn't need to get closed at all, - // we close it @ recvRoutine. - - // c.Stop() -} - // OnStop implements BaseService func (c *MConnection) OnStop() { if c.stopServices() { @@ -375,49 +334,6 @@ func (c *MConnection) Send(chID byte, msgBytes []byte) bool { return success } -// Queues a message to be sent to channel. -// Nonblocking, returns true if successful. -func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool { - if !c.IsRunning() { - return false - } - - c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", msgBytes) - - // Send message to channel. - channel, ok := c.channelsIdx[chID] - if !ok { - c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID)) - return false - } - - ok = channel.trySendBytes(msgBytes) - if ok { - // Wake up sendRoutine if necessary - select { - case c.send <- struct{}{}: - default: - } - } - - return ok -} - -// CanSend returns true if you can send more data onto the chID, false -// otherwise. Use only as a heuristic. -func (c *MConnection) CanSend(chID byte) bool { - if !c.IsRunning() { - return false - } - - channel, ok := c.channelsIdx[chID] - if !ok { - c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID)) - return false - } - return channel.canSend() -} - // sendRoutine polls for packets to send from channels. func (c *MConnection) sendRoutine() { defer c._recover() @@ -682,13 +598,6 @@ func (c *MConnection) maxPacketMsgSize() int { return len(bz) } -type ConnectionStatus struct { - Duration time.Duration - SendMonitor flow.Status - RecvMonitor flow.Status - Channels []ChannelStatus -} - type ChannelStatus struct { ID byte SendQueueCapacity int @@ -697,24 +606,6 @@ type ChannelStatus struct { RecentlySent int64 } -func (c *MConnection) Status() ConnectionStatus { - var status ConnectionStatus - status.Duration = time.Since(c.created) - status.SendMonitor = c.sendMonitor.Status() - status.RecvMonitor = c.recvMonitor.Status() - status.Channels = make([]ChannelStatus, len(c.channels)) - for i, channel := range c.channels { - status.Channels[i] = ChannelStatus{ - ID: channel.desc.ID, - SendQueueCapacity: cap(channel.sendQueue), - SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)), - Priority: channel.desc.Priority, - RecentlySent: atomic.LoadInt64(&channel.recentlySent), - } - } - return status -} - //----------------------------------------------------------------------------- type ChannelDescriptor struct { @@ -800,30 +691,6 @@ func (ch *Channel) sendBytes(bytes []byte) bool { } } -// Queues message to send to this channel. -// Nonblocking, returns true if successful. -// Goroutine-safe -func (ch *Channel) trySendBytes(bytes []byte) bool { - select { - case ch.sendQueue <- bytes: - atomic.AddInt32(&ch.sendQueueSize, 1) - return true - default: - return false - } -} - -// Goroutine-safe -func (ch *Channel) loadSendQueueSize() (size int) { - return int(atomic.LoadInt32(&ch.sendQueueSize)) -} - -// Goroutine-safe -// Use only as a heuristic. -func (ch *Channel) canSend() bool { - return ch.loadSendQueueSize() < defaultSendQueueCapacity -} - // Returns true if any PacketMsgs are pending to be sent. // Call before calling nextPacketMsg() // Goroutine-safe diff --git a/internal/p2p/conn/connection_test.go b/internal/p2p/conn/connection_test.go index 6d009f85c..6a9c0988f 100644 --- a/internal/p2p/conn/connection_test.go +++ b/internal/p2p/conn/connection_test.go @@ -69,9 +69,6 @@ func TestMConnectionSendFlushStop(t *testing.T) { errCh <- err }() - // stop the conn - it should flush all conns - clientConn.FlushStop() - timer := time.NewTimer(3 * time.Second) select { case <-errCh: @@ -97,16 +94,14 @@ func TestMConnectionSend(t *testing.T) { if err != nil { t.Error(err) } - assert.True(t, mconn.CanSend(0x01)) msg = []byte("Spider-Man") - assert.True(t, mconn.TrySend(0x01, msg)) + assert.True(t, mconn.Send(0x01, msg)) _, err = server.Read(make([]byte, len(msg))) if err != nil { t.Error(err) } - assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown") assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown") } @@ -145,20 +140,6 @@ func TestMConnectionReceive(t *testing.T) { } } -func TestMConnectionStatus(t *testing.T) { - server, client := NetPipe() - t.Cleanup(closeAll(t, client, server)) - - mconn := createTestMConnection(client) - err := mconn.Start() - require.Nil(t, err) - t.Cleanup(stopAll(t, mconn)) - - status := mconn.Status() - assert.NotNil(t, status) - assert.Zero(t, status.Channels[0].SendQueueSize) -} - func TestMConnectionPongTimeoutResultsInError(t *testing.T) { server, client := net.Pipe() t.Cleanup(closeAll(t, client, server)) @@ -514,18 +495,15 @@ func TestMConnectionTrySend(t *testing.T) { msg := []byte("Semicolon-Woman") resultCh := make(chan string, 2) - assert.True(t, mconn.TrySend(0x01, msg)) + assert.True(t, mconn.Send(0x01, msg)) _, err = server.Read(make([]byte, len(msg))) require.NoError(t, err) - assert.True(t, mconn.CanSend(0x01)) - assert.True(t, mconn.TrySend(0x01, msg)) - assert.False(t, mconn.CanSend(0x01)) + assert.True(t, mconn.Send(0x01, msg)) go func() { - mconn.TrySend(0x01, msg) + mconn.Send(0x01, msg) resultCh <- "TrySend" }() - assert.False(t, mconn.CanSend(0x01)) - assert.False(t, mconn.TrySend(0x01, msg)) + assert.False(t, mconn.Send(0x01, msg)) assert.Equal(t, "TrySend", <-resultCh) } diff --git a/internal/p2p/conn_set.go b/internal/p2p/conn_set.go deleted file mode 100644 index 987d9f968..000000000 --- a/internal/p2p/conn_set.go +++ /dev/null @@ -1,82 +0,0 @@ -package p2p - -import ( - "net" - - tmsync "github.com/tendermint/tendermint/internal/libs/sync" -) - -// ConnSet is a lookup table for connections and all their ips. -type ConnSet interface { - Has(net.Conn) bool - HasIP(net.IP) bool - Set(net.Conn, []net.IP) - Remove(net.Conn) - RemoveAddr(net.Addr) -} - -type connSetItem struct { - conn net.Conn - ips []net.IP -} - -type connSet struct { - tmsync.RWMutex - - conns map[string]connSetItem -} - -// NewConnSet returns a ConnSet implementation. -func NewConnSet() ConnSet { - return &connSet{ - conns: map[string]connSetItem{}, - } -} - -func (cs *connSet) Has(c net.Conn) bool { - cs.RLock() - defer cs.RUnlock() - - _, ok := cs.conns[c.RemoteAddr().String()] - - return ok -} - -func (cs *connSet) HasIP(ip net.IP) bool { - cs.RLock() - defer cs.RUnlock() - - for _, c := range cs.conns { - for _, known := range c.ips { - if known.Equal(ip) { - return true - } - } - } - - return false -} - -func (cs *connSet) Remove(c net.Conn) { - cs.Lock() - defer cs.Unlock() - - delete(cs.conns, c.RemoteAddr().String()) -} - -func (cs *connSet) RemoveAddr(addr net.Addr) { - cs.Lock() - defer cs.Unlock() - - delete(cs.conns, addr.String()) -} - -func (cs *connSet) Set(c net.Conn, ips []net.IP) { - cs.Lock() - defer cs.Unlock() - - cs.conns[c.RemoteAddr().String()] = connSetItem{ - conn: c, - ips: ips, - } -} diff --git a/internal/p2p/mock/peer.go b/internal/p2p/mock/peer.go index cede51768..366f1d7fd 100644 --- a/internal/p2p/mock/peer.go +++ b/internal/p2p/mock/peer.go @@ -4,7 +4,6 @@ import ( "net" "github.com/tendermint/tendermint/internal/p2p" - "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/types" ) @@ -51,10 +50,10 @@ func (mp *Peer) NodeInfo() types.NodeInfo { ListenAddr: mp.addr.DialString(), } } -func (mp *Peer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } -func (mp *Peer) ID() types.NodeID { return mp.id } -func (mp *Peer) IsOutbound() bool { return mp.Outbound } -func (mp *Peer) IsPersistent() bool { return mp.Persistent } + +func (mp *Peer) ID() types.NodeID { return mp.id } +func (mp *Peer) IsOutbound() bool { return mp.Outbound } +func (mp *Peer) IsPersistent() bool { return mp.Persistent } func (mp *Peer) Get(key string) interface{} { if value, ok := mp.kv[key]; ok { return value diff --git a/internal/p2p/mocks/connection.go b/internal/p2p/mocks/connection.go index 6c6174117..03f7b5b33 100644 --- a/internal/p2p/mocks/connection.go +++ b/internal/p2p/mocks/connection.go @@ -5,11 +5,8 @@ package mocks import ( context "context" - conn "github.com/tendermint/tendermint/internal/p2p/conn" - - crypto "github.com/tendermint/tendermint/crypto" - mock "github.com/stretchr/testify/mock" + crypto "github.com/tendermint/tendermint/crypto" p2p "github.com/tendermint/tendermint/internal/p2p" @@ -35,20 +32,6 @@ func (_m *Connection) Close() error { return r0 } -// FlushClose provides a mock function with given fields: -func (_m *Connection) FlushClose() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - // Handshake provides a mock function with given fields: _a0, _a1, _a2 func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) { ret := _m.Called(_a0, _a1, _a2) @@ -138,35 +121,14 @@ func (_m *Connection) RemoteEndpoint() p2p.Endpoint { } // SendMessage provides a mock function with given fields: _a0, _a1 -func (_m *Connection) SendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error) { +func (_m *Connection) SendMessage(_a0 p2p.ChannelID, _a1 []byte) error { ret := _m.Called(_a0, _a1) - var r0 bool - if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) bool); ok { + var r0 error + if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) error); ok { r0 = rf(_a0, _a1) } else { - r0 = ret.Get(0).(bool) - } - - var r1 error - if rf, ok := ret.Get(1).(func(p2p.ChannelID, []byte) error); ok { - r1 = rf(_a0, _a1) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Status provides a mock function with given fields: -func (_m *Connection) Status() conn.ConnectionStatus { - ret := _m.Called() - - var r0 conn.ConnectionStatus - if rf, ok := ret.Get(0).(func() conn.ConnectionStatus); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(conn.ConnectionStatus) + r0 = ret.Error(0) } return r0 @@ -185,24 +147,3 @@ func (_m *Connection) String() string { return r0 } - -// TrySendMessage provides a mock function with given fields: _a0, _a1 -func (_m *Connection) TrySendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error) { - ret := _m.Called(_a0, _a1) - - var r0 bool - if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) bool); ok { - r0 = rf(_a0, _a1) - } else { - r0 = ret.Get(0).(bool) - } - - var r1 error - if rf, ok := ret.Get(1).(func(p2p.ChannelID, []byte) error); ok { - r1 = rf(_a0, _a1) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} diff --git a/internal/p2p/mocks/peer.go b/internal/p2p/mocks/peer.go index b905c1156..3e5d20325 100644 --- a/internal/p2p/mocks/peer.go +++ b/internal/p2p/mocks/peer.go @@ -3,7 +3,6 @@ package mocks import ( - conn "github.com/tendermint/tendermint/internal/p2p/conn" log "github.com/tendermint/tendermint/libs/log" mock "github.com/stretchr/testify/mock" @@ -272,20 +271,6 @@ func (_m *Peer) Start() error { return r0 } -// Status provides a mock function with given fields: -func (_m *Peer) Status() conn.ConnectionStatus { - ret := _m.Called() - - var r0 conn.ConnectionStatus - if rf, ok := ret.Get(0).(func() conn.ConnectionStatus); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(conn.ConnectionStatus) - } - - return r0 -} - // Stop provides a mock function with given fields: func (_m *Peer) Stop() error { ret := _m.Called() diff --git a/internal/p2p/router.go b/internal/p2p/router.go index ef1eef564..ed6119431 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -960,8 +960,7 @@ func (r *Router) sendPeer(peerID types.NodeID, conn Connection, peerQueue queue) continue } - _, err = conn.SendMessage(envelope.channelID, bz) - if err != nil { + if err = conn.SendMessage(envelope.channelID, bz); err != nil { return err } diff --git a/internal/p2p/shim.go b/internal/p2p/shim.go index c9cdc2756..9f82bc11a 100644 --- a/internal/p2p/shim.go +++ b/internal/p2p/shim.go @@ -4,8 +4,6 @@ import ( "sort" "github.com/gogo/protobuf/proto" - "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/libs/log" ) @@ -74,17 +72,6 @@ func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim { } } -// MConnConfig returns an MConnConfig based on the defaults, with fields updated -// from the P2PConfig. -func MConnConfig(cfg *config.P2PConfig) conn.MConnConfig { - mConfig := conn.DefaultMConnConfig() - mConfig.FlushThrottle = cfg.FlushThrottleTimeout - mConfig.SendRate = cfg.SendRate - mConfig.RecvRate = cfg.RecvRate - mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize - return mConfig -} - // GetChannels implements the legacy Reactor interface for getting a slice of all // the supported ChannelDescriptors. func (rs *ReactorShim) GetChannels() []*ChannelDescriptor { diff --git a/internal/p2p/transport.go b/internal/p2p/transport.go index 0b2311fa3..8440889d4 100644 --- a/internal/p2p/transport.go +++ b/internal/p2p/transport.go @@ -7,7 +7,6 @@ import ( "net" "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/types" ) @@ -82,19 +81,7 @@ type Connection interface { ReceiveMessage() (ChannelID, []byte, error) // SendMessage sends a message on the connection. Returns io.EOF if closed. - // - // FIXME: For compatibility with the legacy P2P stack, it returns an - // additional boolean false if the message timed out waiting to be accepted - // into the send buffer. This should be removed. - SendMessage(ChannelID, []byte) (bool, error) - - // TrySendMessage is a non-blocking version of SendMessage that returns - // immediately if the message buffer is full. It returns true if the message - // was accepted. - // - // FIXME: This method is here for backwards-compatibility with the legacy - // P2P stack and should be removed. - TrySendMessage(ChannelID, []byte) (bool, error) + SendMessage(ChannelID, []byte) error // LocalEndpoint returns the local endpoint for the connection. LocalEndpoint() Endpoint @@ -105,18 +92,6 @@ type Connection interface { // Close closes the connection. Close() error - // FlushClose flushes all pending sends and then closes the connection. - // - // FIXME: This only exists for backwards-compatibility with the current - // MConnection implementation. There should really be a separate Flush() - // method, but there is no easy way to synchronously flush pending data with - // the current MConnection code. - FlushClose() error - - // Status returns the current connection status. - // FIXME: Only here for compatibility with the current Peer code. - Status() conn.ConnectionStatus - // Stringer is used to display the connection, e.g. in logs. // // Without this, the logger may use reflection to access and display diff --git a/internal/p2p/transport_mconn.go b/internal/p2p/transport_mconn.go index eca261476..de643bdb9 100644 --- a/internal/p2p/transport_mconn.go +++ b/internal/p2p/transport_mconn.go @@ -377,32 +377,21 @@ func (c *mConnConnection) String() string { } // SendMessage implements Connection. -func (c *mConnConnection) SendMessage(chID ChannelID, msg []byte) (bool, error) { +func (c *mConnConnection) SendMessage(chID ChannelID, msg []byte) error { if chID > math.MaxUint8 { - return false, fmt.Errorf("MConnection only supports 1-byte channel IDs (got %v)", chID) + return fmt.Errorf("MConnection only supports 1-byte channel IDs (got %v)", chID) } select { case err := <-c.errorCh: - return false, err + return err case <-c.closeCh: - return false, io.EOF + return io.EOF default: - return c.mconn.Send(byte(chID), msg), nil - } -} + if ok := c.mconn.Send(byte(chID), msg); !ok { + return errors.New("sending message timed out") + } -// TrySendMessage implements Connection. -func (c *mConnConnection) TrySendMessage(chID ChannelID, msg []byte) (bool, error) { - if chID > math.MaxUint8 { - return false, fmt.Errorf("MConnection only supports 1-byte channel IDs (got %v)", chID) - } - select { - case err := <-c.errorCh: - return false, err - case <-c.closeCh: - return false, io.EOF - default: - return c.mconn.TrySend(byte(chID), msg), nil + return nil } } @@ -442,14 +431,6 @@ func (c *mConnConnection) RemoteEndpoint() Endpoint { return endpoint } -// Status implements Connection. -func (c *mConnConnection) Status() conn.ConnectionStatus { - if c.mconn == nil { - return conn.ConnectionStatus{} - } - return c.mconn.Status() -} - // Close implements Connection. func (c *mConnConnection) Close() error { var err error @@ -463,17 +444,3 @@ func (c *mConnConnection) Close() error { }) return err } - -// FlushClose implements Connection. -func (c *mConnConnection) FlushClose() error { - var err error - c.closeOnce.Do(func() { - if c.mconn != nil && c.mconn.IsRunning() { - c.mconn.FlushStop() - } else { - err = c.conn.Close() - } - close(c.closeCh) - }) - return err -} diff --git a/internal/p2p/transport_mconn_test.go b/internal/p2p/transport_mconn_test.go index 06cd93c0a..f4d7198ed 100644 --- a/internal/p2p/transport_mconn_test.go +++ b/internal/p2p/transport_mconn_test.go @@ -195,7 +195,6 @@ func TestMConnTransport_Listen(t *testing.T) { _ = conn.Close() <-dialedChan - time.Sleep(time.Minute) // closing the connection should not error require.NoError(t, peerConn.Close()) diff --git a/internal/p2p/transport_memory.go b/internal/p2p/transport_memory.go index 09a387254..853288c59 100644 --- a/internal/p2p/transport_memory.go +++ b/internal/p2p/transport_memory.go @@ -10,7 +10,6 @@ import ( "github.com/tendermint/tendermint/crypto" tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) @@ -262,11 +261,6 @@ func (c *MemoryConnection) RemoteEndpoint() Endpoint { } } -// Status implements Connection. -func (c *MemoryConnection) Status() conn.ConnectionStatus { - return conn.ConnectionStatus{} -} - // Handshake implements Connection. func (c *MemoryConnection) Handshake( ctx context.Context, @@ -316,42 +310,21 @@ func (c *MemoryConnection) ReceiveMessage() (ChannelID, []byte, error) { } // SendMessage implements Connection. -func (c *MemoryConnection) SendMessage(chID ChannelID, msg []byte) (bool, error) { +func (c *MemoryConnection) SendMessage(chID ChannelID, msg []byte) error { // Check close first, since channels are buffered. Otherwise, below select // may non-deterministically return non-error even when closed. select { case <-c.closer.Done(): - return false, io.EOF + return io.EOF default: } select { case c.sendCh <- memoryMessage{channelID: chID, message: msg}: c.logger.Debug("sent message", "chID", chID, "msg", msg) - return true, nil + return nil case <-c.closer.Done(): - return false, io.EOF - } -} - -// TrySendMessage implements Connection. -func (c *MemoryConnection) TrySendMessage(chID ChannelID, msg []byte) (bool, error) { - // Check close first, since channels are buffered. Otherwise, below select - // may non-deterministically return non-error even when closed. - select { - case <-c.closer.Done(): - return false, io.EOF - default: - } - - select { - case c.sendCh <- memoryMessage{channelID: chID, message: msg}: - c.logger.Debug("sent message", "chID", chID, "msg", msg) - return true, nil - case <-c.closer.Done(): - return false, io.EOF - default: - return false, nil + return io.EOF } } @@ -366,8 +339,3 @@ func (c *MemoryConnection) Close() error { } return nil } - -// FlushClose implements Connection. -func (c *MemoryConnection) FlushClose() error { - return c.Close() -} diff --git a/internal/p2p/transport_test.go b/internal/p2p/transport_test.go index 1b8ab77f5..cdfb57c70 100644 --- a/internal/p2p/transport_test.go +++ b/internal/p2p/transport_test.go @@ -315,22 +315,16 @@ func TestConnection_FlushClose(t *testing.T) { b := makeTransport(t) ab, _ := dialAcceptHandshake(t, a, b) - // FIXME: FlushClose should be removed (and replaced by separate Flush - // and Close calls if necessary). We can't reliably test it, so we just - // make sure it closes both ends and that it's idempotent. - err := ab.FlushClose() + err := ab.Close() require.NoError(t, err) _, _, err = ab.ReceiveMessage() require.Error(t, err) require.Equal(t, io.EOF, err) - _, err = ab.SendMessage(chID, []byte("closed")) + err = ab.SendMessage(chID, []byte("closed")) require.Error(t, err) require.Equal(t, io.EOF, err) - - err = ab.FlushClose() - require.NoError(t, err) }) } @@ -355,9 +349,8 @@ func TestConnection_SendReceive(t *testing.T) { ab, ba := dialAcceptHandshake(t, a, b) // Can send and receive a to b. - ok, err := ab.SendMessage(chID, []byte("foo")) + err := ab.SendMessage(chID, []byte("foo")) require.NoError(t, err) - require.True(t, ok) ch, msg, err := ba.ReceiveMessage() require.NoError(t, err) @@ -365,30 +358,20 @@ func TestConnection_SendReceive(t *testing.T) { require.Equal(t, chID, ch) // Can send and receive b to a. - _, err = ba.SendMessage(chID, []byte("bar")) + err = ba.SendMessage(chID, []byte("bar")) require.NoError(t, err) _, msg, err = ab.ReceiveMessage() require.NoError(t, err) require.Equal(t, []byte("bar"), msg) - // TrySendMessage also works. - ok, err = ba.TrySendMessage(chID, []byte("try")) - require.NoError(t, err) - require.True(t, ok) - - ch, msg, err = ab.ReceiveMessage() - require.NoError(t, err) - require.Equal(t, []byte("try"), msg) - require.Equal(t, chID, ch) - // Connections should still be active after closing the transports. err = a.Close() require.NoError(t, err) err = b.Close() require.NoError(t, err) - _, err = ab.SendMessage(chID, []byte("still here")) + err = ab.SendMessage(chID, []byte("still here")) require.NoError(t, err) ch, msg, err = ba.ReceiveMessage() require.NoError(t, err) @@ -403,34 +386,18 @@ func TestConnection_SendReceive(t *testing.T) { _, _, err = ab.ReceiveMessage() require.Error(t, err) require.Equal(t, io.EOF, err) - _, err = ab.TrySendMessage(chID, []byte("closed try")) - require.Error(t, err) - require.Equal(t, io.EOF, err) - _, err = ab.SendMessage(chID, []byte("closed")) + + err = ab.SendMessage(chID, []byte("closed")) require.Error(t, err) require.Equal(t, io.EOF, err) _, _, err = ba.ReceiveMessage() require.Error(t, err) require.Equal(t, io.EOF, err) - _, err = ba.TrySendMessage(chID, []byte("closed try")) + + err = ba.SendMessage(chID, []byte("closed")) require.Error(t, err) require.Equal(t, io.EOF, err) - _, err = ba.SendMessage(chID, []byte("closed")) - require.Error(t, err) - require.Equal(t, io.EOF, err) - }) -} - -func TestConnection_Status(t *testing.T) { - withTransports(t, func(t *testing.T, makeTransport transportFactory) { - a := makeTransport(t) - b := makeTransport(t) - ab, _ := dialAcceptHandshake(t, a, b) - - // FIXME: This isn't implemented in all transports, so for now we just - // check that it doesn't panic, which isn't really much of a test. - ab.Status() }) } diff --git a/internal/p2p/types.go b/internal/p2p/types.go index 403f43528..388ff2253 100644 --- a/internal/p2p/types.go +++ b/internal/p2p/types.go @@ -5,4 +5,3 @@ import ( ) type ChannelDescriptor = conn.ChannelDescriptor -type ConnectionStatus = conn.ConnectionStatus diff --git a/node/node.go b/node/node.go index 891c5fce6..eeb301614 100644 --- a/node/node.go +++ b/node/node.go @@ -1043,10 +1043,6 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt QueueType: conf.P2P.QueueType, } - if conf.P2P.MaxNumInboundPeers > 0 { - opts.MaxIncomingConnectionAttempts = conf.P2P.MaxIncomingConnectionAttempts - } - if conf.FilterPeers && proxyApp != nil { opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error { res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{ diff --git a/node/setup.go b/node/setup.go index 8889edc4e..893d7ccde 100644 --- a/node/setup.go +++ b/node/setup.go @@ -3,7 +3,6 @@ package node import ( "bytes" "fmt" - "math" "time" dbm "github.com/tendermint/tm-db" @@ -18,6 +17,7 @@ import ( mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0" mempoolv1 "github.com/tendermint/tendermint/internal/mempool/v1" "github.com/tendermint/tendermint/internal/p2p" + "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/internal/p2p/pex" "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" @@ -344,11 +344,9 @@ func createConsensusReactor( func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport { return p2p.NewMConnTransport( - logger, p2p.MConnConfig(cfg.P2P), []*p2p.ChannelDescriptor{}, + logger, conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{}, p2p.MConnTransportOptions{ - MaxAcceptedConnections: uint32(cfg.P2P.MaxNumInboundPeers + - len(tmstrings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ")), - ), + MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections), }, ) } @@ -365,20 +363,6 @@ func createPeerManager( switch { case cfg.P2P.MaxConnections > 0: maxConns = cfg.P2P.MaxConnections - - case cfg.P2P.MaxNumInboundPeers > 0 && cfg.P2P.MaxNumOutboundPeers > 0: - x := cfg.P2P.MaxNumInboundPeers + cfg.P2P.MaxNumOutboundPeers - if x > math.MaxUint16 { - return nil, fmt.Errorf( - "max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)", - cfg.P2P.MaxNumInboundPeers, - cfg.P2P.MaxNumOutboundPeers, - math.MaxUint16, - ) - } - - maxConns = uint16(x) - default: maxConns = 64 } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 289d947a9..58e48dbba 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -79,7 +79,7 @@ func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) ( select { case event := <-eventCh: - return event.Data.(types.TMEventData), nil + return event.Data, nil case <-ctx.Done(): return nil, errors.New("timed out waiting for event") } diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index 3b1184e9c..9bf76c874 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -237,7 +237,6 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { cfg.RPC.ListenAddress = "tcp://0.0.0.0:26657" cfg.RPC.PprofListenAddress = ":6060" cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) - cfg.P2P.AddrBookStrict = false cfg.P2P.QueueType = node.QueueType cfg.DBBackend = node.Database cfg.StateSync.DiscoveryTime = 5 * time.Second