p2p: simple request and response framework (#8500)

This commit is contained in:
Sam Kleinman
2022-05-11 14:43:54 -04:00
committed by GitHub
parent d30a1821cc
commit b7ccee6240

View File

@@ -1,16 +1,21 @@
package p2p
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/tendermint/tendermint/internal/libs/protoio"
"github.com/tendermint/tendermint/types"
)
@@ -224,6 +229,7 @@ type libp2pChannelImpl struct {
pubsub *pubsub.PubSub
host host.Host
topic *pubsub.Topic
recvCh chan Envelope
chainID string
wrapper Wrapper
}
@@ -233,7 +239,9 @@ func NewLibP2PChannel(chainID string, chDesc *ChannelDescriptor, ps *pubsub.PubS
chDesc: chDesc,
pubsub: ps,
host: h,
chainID: chainID}
chainID: chainID,
recvCh: make(chan Envelope, chDesc.RecvMessageCapacity),
}
topic, err := ps.Join(ch.canonicalizedTopicName())
if err != nil {
return nil, err
@@ -244,9 +252,6 @@ func NewLibP2PChannel(chainID string, chDesc *ChannelDescriptor, ps *pubsub.PubS
ch.wrapper = w
}
// TODO(tychoish) register handlers for
// request/response patterns
return ch, nil
}
@@ -259,17 +264,76 @@ func (ch *libp2pChannelImpl) canonicalizedTopicName() string {
}
func (ch *libp2pChannelImpl) Receive(ctx context.Context) *ChannelIterator {
// TODO: consider caching an iterator in the channel, or
// erroring if this gets called more than once.
//
// While it's safe to register a handler more than once, we
// could get into a dodgy situation where if you call receive
// more than once, the subsequently messages won't be routed
// correctly.
iter := &ChannelIterator{
pipe: make(chan Envelope),
}
ch.host.SetStreamHandler(protocol.ID(ch.canonicalizedTopicName()), func(stream network.Stream) {
// TODO: properly capture the max message size here.
reader := protoio.NewDelimitedReader(bufio.NewReader(stream), ch.chDesc.RecvBufferCapacity*2)
remote := stream.Conn().RemotePeer()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() { <-ctx.Done(); _ = stream.Close() }()
for {
payload := proto.Clone(ch.chDesc.MessageType)
if _, err := reader.ReadMsg(payload); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
// TODO: propagate or capture this error
}
select {
case <-ctx.Done():
return
case iter.pipe <- Envelope{
From: types.NodeID(remote),
Message: payload,
ChannelID: ch.chDesc.ID,
}:
}
}
})
sub, err := ch.topic.Subscribe()
if err != nil {
return nil
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer close(iter.pipe)
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case e := <-ch.recvCh:
select {
case <-ctx.Done():
return
case iter.pipe <- e:
continue
}
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
msg, err := sub.Next(ctx)
if err != nil {
@@ -303,7 +367,12 @@ func (ch *libp2pChannelImpl) Receive(ctx context.Context) *ChannelIterator {
}
}()
return nil
// TODO: this is probably wrong in it's current form: the
// handler for point-to-point messages could still end up
// trying to send into the pipe after things close.
go func() { wg.Wait(); defer close(iter.pipe) }()
return iter
}
func (ch *libp2pChannelImpl) Send(ctx context.Context, e Envelope) error {
@@ -326,27 +395,22 @@ func (ch *libp2pChannelImpl) Send(ctx context.Context, e Envelope) error {
return ch.topic.Publish(ctx, bz)
}
// TODO: remove this, likely. Checking to see if a topic has a
// peer is *probably* right, but maybe it's better to just try
// and connect to a peer directly (using whatever method) and
// go from there.
if !ch.topicHasPeer(peer.ID(e.To)) {
return fmt.Errorf("peer %q does not exist", e.To)
}
switch ch.host.Network().Connectedness(peer.ID(e.To)) {
case network.CannotConnect:
return fmt.Errorf("cannot connect to %q", e.To)
default:
// TODO: should we should attempt to reuse between peers?
stream, err := ch.host.NewStream(ctx, peer.ID(e.To), protocol.ID(ch.canonicalizedTopicName()))
if err != nil {
return err
}
// TODO: there's likely some tooling that exists for doing
// point-to-point messaging that we can leverage here, rather
// than implementing directly on-top of libp2p streams.
return errors.New("direct messages between peers not supported, yet")
}
func (ch *libp2pChannelImpl) topicHasPeer(id peer.ID) bool {
for _, peer := range ch.pubsub.ListPeers(ch.canonicalizedTopicName()) {
if peer == id {
return true
_, err = stream.Write(bz)
if err != nil {
return err
}
}
return false
return nil
}
func (ch *libp2pChannelImpl) SendError(ctx context.Context, pe PeerError) error {
@@ -354,7 +418,8 @@ func (ch *libp2pChannelImpl) SendError(ctx context.Context, pe PeerError) error
// shouldn't be handled as a property of the channel, and
// rather as part of some peer-info/network-management
// interface, but we can do it here for now, to ensure compatibility.
ch.pubsub.BlacklistPeer(peer.ID(pe.NodeID))
return nil
//
// Closing the peer is the same behavior as the legacy system,
// and seems less drastic than blacklisting the peer forever.
return ch.host.Network().ClosePeer(peer.ID(pe.NodeID))
}