diff --git a/internal/p2p/channel.go b/internal/p2p/channel.go index 2e6e3e070..e0a69a3b2 100644 --- a/internal/p2p/channel.go +++ b/internal/p2p/channel.go @@ -1,16 +1,21 @@ package p2p import ( + "bufio" "context" "errors" "fmt" + "io" "sync" "github.com/gogo/protobuf/proto" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/tendermint/tendermint/internal/libs/protoio" "github.com/tendermint/tendermint/types" ) @@ -224,6 +229,7 @@ type libp2pChannelImpl struct { pubsub *pubsub.PubSub host host.Host topic *pubsub.Topic + recvCh chan Envelope chainID string wrapper Wrapper } @@ -233,7 +239,9 @@ func NewLibP2PChannel(chainID string, chDesc *ChannelDescriptor, ps *pubsub.PubS chDesc: chDesc, pubsub: ps, host: h, - chainID: chainID} + chainID: chainID, + recvCh: make(chan Envelope, chDesc.RecvMessageCapacity), + } topic, err := ps.Join(ch.canonicalizedTopicName()) if err != nil { return nil, err @@ -244,9 +252,6 @@ func NewLibP2PChannel(chainID string, chDesc *ChannelDescriptor, ps *pubsub.PubS ch.wrapper = w } - // TODO(tychoish) register handlers for - // request/response patterns - return ch, nil } @@ -259,17 +264,76 @@ func (ch *libp2pChannelImpl) canonicalizedTopicName() string { } func (ch *libp2pChannelImpl) Receive(ctx context.Context) *ChannelIterator { + // TODO: consider caching an iterator in the channel, or + // erroring if this gets called more than once. + // + // While it's safe to register a handler more than once, we + // could get into a dodgy situation where if you call receive + // more than once, the subsequently messages won't be routed + // correctly. iter := &ChannelIterator{ pipe: make(chan Envelope), } + ch.host.SetStreamHandler(protocol.ID(ch.canonicalizedTopicName()), func(stream network.Stream) { + // TODO: properly capture the max message size here. + reader := protoio.NewDelimitedReader(bufio.NewReader(stream), ch.chDesc.RecvBufferCapacity*2) + + remote := stream.Conn().RemotePeer() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { <-ctx.Done(); _ = stream.Close() }() + + for { + payload := proto.Clone(ch.chDesc.MessageType) + + if _, err := reader.ReadMsg(payload); err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + // TODO: propagate or capture this error + } + select { + case <-ctx.Done(): + return + case iter.pipe <- Envelope{ + From: types.NodeID(remote), + Message: payload, + ChannelID: ch.chDesc.ID, + }: + } + } + }) + sub, err := ch.topic.Subscribe() if err != nil { return nil } + wg := &sync.WaitGroup{} + + wg.Add(1) go func() { - defer close(iter.pipe) + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case e := <-ch.recvCh: + select { + case <-ctx.Done(): + return + case iter.pipe <- e: + continue + } + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() for { msg, err := sub.Next(ctx) if err != nil { @@ -303,7 +367,12 @@ func (ch *libp2pChannelImpl) Receive(ctx context.Context) *ChannelIterator { } }() - return nil + // TODO: this is probably wrong in it's current form: the + // handler for point-to-point messages could still end up + // trying to send into the pipe after things close. + go func() { wg.Wait(); defer close(iter.pipe) }() + + return iter } func (ch *libp2pChannelImpl) Send(ctx context.Context, e Envelope) error { @@ -326,27 +395,22 @@ func (ch *libp2pChannelImpl) Send(ctx context.Context, e Envelope) error { return ch.topic.Publish(ctx, bz) } - // TODO: remove this, likely. Checking to see if a topic has a - // peer is *probably* right, but maybe it's better to just try - // and connect to a peer directly (using whatever method) and - // go from there. - if !ch.topicHasPeer(peer.ID(e.To)) { - return fmt.Errorf("peer %q does not exist", e.To) - } + switch ch.host.Network().Connectedness(peer.ID(e.To)) { + case network.CannotConnect: + return fmt.Errorf("cannot connect to %q", e.To) + default: + // TODO: should we should attempt to reuse between peers? + stream, err := ch.host.NewStream(ctx, peer.ID(e.To), protocol.ID(ch.canonicalizedTopicName())) + if err != nil { + return err + } - // TODO: there's likely some tooling that exists for doing - // point-to-point messaging that we can leverage here, rather - // than implementing directly on-top of libp2p streams. - return errors.New("direct messages between peers not supported, yet") -} - -func (ch *libp2pChannelImpl) topicHasPeer(id peer.ID) bool { - for _, peer := range ch.pubsub.ListPeers(ch.canonicalizedTopicName()) { - if peer == id { - return true + _, err = stream.Write(bz) + if err != nil { + return err } } - return false + return nil } func (ch *libp2pChannelImpl) SendError(ctx context.Context, pe PeerError) error { @@ -354,7 +418,8 @@ func (ch *libp2pChannelImpl) SendError(ctx context.Context, pe PeerError) error // shouldn't be handled as a property of the channel, and // rather as part of some peer-info/network-management // interface, but we can do it here for now, to ensure compatibility. - ch.pubsub.BlacklistPeer(peer.ID(pe.NodeID)) - - return nil + // + // Closing the peer is the same behavior as the legacy system, + // and seems less drastic than blacklisting the peer forever. + return ch.host.Network().ClosePeer(peer.ID(pe.NodeID)) }