From 4e355d80c4b44031150e1934e6f4b85ada7b8139 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 3 Dec 2021 15:19:04 -0500 Subject: [PATCH] p2p: implement interface for p2p.Channel without channels (#7378) --- internal/p2p/channel.go | 216 ++++++++++++++++++++++++++++++++++ internal/p2p/channel_test.go | 221 +++++++++++++++++++++++++++++++++++ internal/p2p/router.go | 75 ------------ 3 files changed, 437 insertions(+), 75 deletions(-) create mode 100644 internal/p2p/channel.go create mode 100644 internal/p2p/channel_test.go diff --git a/internal/p2p/channel.go b/internal/p2p/channel.go new file mode 100644 index 000000000..2143589fd --- /dev/null +++ b/internal/p2p/channel.go @@ -0,0 +1,216 @@ +package p2p + +import ( + "context" + "fmt" + "sync" + + "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/types" +) + +// Envelope contains a message with sender/receiver routing info. +type Envelope struct { + From types.NodeID // sender (empty if outbound) + To types.NodeID // receiver (empty if inbound) + Broadcast bool // send to all connected peers (ignores To) + Message proto.Message // message payload + + // channelID is for internal Router use, set on outbound messages to inform + // the sendPeer() goroutine which transport channel to use. + // + // FIXME: If we migrate the Transport API to a byte-oriented multi-stream + // API, this will no longer be necessary since each channel will be mapped + // onto a stream during channel/peer setup. See: + // https://github.com/tendermint/spec/pull/227 + channelID ChannelID +} + +// Wrapper is a Protobuf message that can contain a variety of inner messages +// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the +// Router will automatically wrap outbound messages and unwrap inbound messages, +// such that reactors do not have to do this themselves. +type Wrapper interface { + proto.Message + + // Wrap will take a message and wrap it in this one if possible. + Wrap(proto.Message) error + + // Unwrap will unwrap the inner message contained in this message. + Unwrap() (proto.Message, error) +} + +// PeerError is a peer error reported via Channel.Error. +// +// FIXME: This currently just disconnects the peer, which is too simplistic. +// For example, some errors should be logged, some should cause disconnects, +// and some should ban the peer. +// +// FIXME: This should probably be replaced by a more general PeerBehavior +// concept that can mark good and bad behavior and contributes to peer scoring. +// It should possibly also allow reactors to request explicit actions, e.g. +// disconnection or banning, in addition to doing this based on aggregates. +type PeerError struct { + NodeID types.NodeID + Err error +} + +func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) } +func (pe PeerError) Unwrap() error { return pe.Err } + +// Channel is a bidirectional channel to exchange Protobuf messages with peers. +// Each message is wrapped in an Envelope to specify its sender and receiver. +type Channel struct { + ID ChannelID + In <-chan Envelope // inbound messages (peers to reactors) + Out chan<- Envelope // outbound messages (reactors to peers) + Error chan<- PeerError // peer error reporting + + messageType proto.Message // the channel's message type, used for unmarshaling +} + +// NewChannel creates a new channel. It is primarily for internal and test +// use, reactors should use Router.OpenChannel(). +func NewChannel( + id ChannelID, + messageType proto.Message, + inCh <-chan Envelope, + outCh chan<- Envelope, + errCh chan<- PeerError, +) *Channel { + return &Channel{ + ID: id, + messageType: messageType, + In: inCh, + Out: outCh, + Error: errCh, + } +} + +// Send blocks until the envelope has been sent, or until ctx ends. +// An error only occurs if the context ends before the send completes. +func (ch *Channel) Send(ctx context.Context, envelope Envelope) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ch.Out <- envelope: + return nil + } +} + +// SendError blocks until the given error has been sent, or ctx ends. +// An error only occurs if the context ends before the send completes. +func (ch *Channel) SendError(ctx context.Context, pe PeerError) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ch.Error <- pe: + return nil + } +} + +// Receive returns a new unbuffered iterator to receive messages from ch. +// The iterator runs until ctx ends. +func (ch *Channel) Receive(ctx context.Context) *ChannelIterator { + iter := &ChannelIterator{ + pipe: make(chan Envelope), // unbuffered + } + go func() { + defer close(iter.pipe) + iteratorWorker(ctx, ch, iter.pipe) + }() + return iter +} + +// ChannelIterator provides a context-aware path for callers +// (reactors) to process messages from the P2P layer without relying +// on the implementation details of the P2P layer. Channel provides +// access to it's Outbound stream as an iterator, and the +// MergedChannelIterator makes it possible to combine multiple +// channels into a single iterator. +type ChannelIterator struct { + pipe chan Envelope + current *Envelope +} + +func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) { + for { + select { + case <-ctx.Done(): + return + case envelope := <-ch.In: + select { + case <-ctx.Done(): + return + case pipe <- envelope: + } + } + } +} + +// Next returns true when the Envelope value has advanced, and false +// when the context is canceled or iteration should stop. If an iterator has returned false, +// it will never return true again. +// in general, use Next, as in: +// +// for iter.Next(ctx) { +// envelope := iter.Envelope() +// // ... do things ... +// } +// +func (iter *ChannelIterator) Next(ctx context.Context) bool { + select { + case <-ctx.Done(): + iter.current = nil + return false + case envelope, ok := <-iter.pipe: + if !ok { + iter.current = nil + return false + } + + iter.current = &envelope + + return true + } +} + +// Envelope returns the current Envelope object held by the +// iterator. When the last call to Next returned true, Envelope will +// return a non-nil object. If Next returned false then Envelope is +// always nil. +func (iter *ChannelIterator) Envelope() *Envelope { return iter.current } + +// MergedChannelIterator produces an iterator that merges the +// messages from the given channels in arbitrary order. +// +// This allows the caller to consume messages from multiple channels +// without needing to manage the concurrency separately. +func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator { + iter := &ChannelIterator{ + pipe: make(chan Envelope), // unbuffered + } + wg := new(sync.WaitGroup) + + done := make(chan struct{}) + go func() { defer close(done); wg.Wait() }() + + go func() { + defer close(iter.pipe) + // we could return early if the context is canceled, + // but this is safer because it means the pipe stays + // open until all of the ch worker threads end, which + // should happen very quickly. + <-done + }() + + for _, ch := range chs { + wg.Add(1) + go func(ch *Channel) { + defer wg.Done() + iteratorWorker(ctx, ch, iter.pipe) + }(ch) + } + + return iter +} diff --git a/internal/p2p/channel_test.go b/internal/p2p/channel_test.go new file mode 100644 index 000000000..0e2d7ea7c --- /dev/null +++ b/internal/p2p/channel_test.go @@ -0,0 +1,221 @@ +package p2p + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/stretchr/testify/require" +) + +type channelInternal struct { + In chan Envelope + Out chan Envelope + Error chan PeerError +} + +func testChannel(size int) (*channelInternal, *Channel) { + in := &channelInternal{ + In: make(chan Envelope, size), + Out: make(chan Envelope, size), + Error: make(chan PeerError, size), + } + ch := &Channel{ + In: in.In, + Out: in.Out, + Error: in.Error, + } + return in, ch +} + +func TestChannel(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + bctx, bcancel := context.WithCancel(context.Background()) + defer bcancel() + + testCases := []struct { + Name string + Case func(context.Context, *testing.T) + }{ + { + Name: "Send", + Case: func(ctx context.Context, t *testing.T) { + ins, ch := testChannel(1) + require.NoError(t, ch.Send(ctx, Envelope{From: "kip", To: "merlin"})) + + res, ok := <-ins.Out + require.True(t, ok) + require.EqualValues(t, "kip", res.From) + require.EqualValues(t, "merlin", res.To) + }, + }, + { + Name: "SendError", + Case: func(ctx context.Context, t *testing.T) { + ins, ch := testChannel(1) + require.NoError(t, ch.SendError(ctx, PeerError{NodeID: "kip", Err: errors.New("merlin")})) + + res, ok := <-ins.Error + require.True(t, ok) + require.EqualValues(t, "kip", res.NodeID) + require.EqualValues(t, "merlin", res.Err.Error()) + }, + }, + { + Name: "SendWithCanceledContext", + Case: func(ctx context.Context, t *testing.T) { + _, ch := testChannel(0) + cctx, ccancel := context.WithCancel(ctx) + ccancel() + require.Error(t, ch.Send(cctx, Envelope{From: "kip", To: "merlin"})) + }, + }, + { + Name: "SendErrorWithCanceledContext", + Case: func(ctx context.Context, t *testing.T) { + _, ch := testChannel(0) + cctx, ccancel := context.WithCancel(ctx) + ccancel() + + require.Error(t, ch.SendError(cctx, PeerError{NodeID: "kip", Err: errors.New("merlin")})) + }, + }, + { + Name: "ReceiveEmptyIteratorBlocks", + Case: func(ctx context.Context, t *testing.T) { + _, ch := testChannel(1) + iter := ch.Receive(ctx) + require.NotNil(t, iter) + out := make(chan bool) + go func() { + defer close(out) + select { + case <-ctx.Done(): + case out <- iter.Next(ctx): + } + }() + select { + case <-time.After(10 * time.Millisecond): + case <-out: + require.Fail(t, "iterator should not advance") + } + require.Nil(t, iter.Envelope()) + }, + }, + { + Name: "ReceiveWithData", + Case: func(ctx context.Context, t *testing.T) { + ins, ch := testChannel(1) + ins.In <- Envelope{From: "kip", To: "merlin"} + iter := ch.Receive(ctx) + require.NotNil(t, iter) + require.True(t, iter.Next(ctx)) + + res := iter.Envelope() + require.EqualValues(t, "kip", res.From) + require.EqualValues(t, "merlin", res.To) + }, + }, + { + Name: "ReceiveWithCanceledContext", + Case: func(ctx context.Context, t *testing.T) { + _, ch := testChannel(0) + cctx, ccancel := context.WithCancel(ctx) + ccancel() + + iter := ch.Receive(cctx) + require.NotNil(t, iter) + require.False(t, iter.Next(cctx)) + require.Nil(t, iter.Envelope()) + }, + }, + { + Name: "IteratorWithCanceledContext", + Case: func(ctx context.Context, t *testing.T) { + _, ch := testChannel(0) + + iter := ch.Receive(ctx) + require.NotNil(t, iter) + + cctx, ccancel := context.WithCancel(ctx) + ccancel() + require.False(t, iter.Next(cctx)) + require.Nil(t, iter.Envelope()) + }, + }, + { + Name: "IteratorCanceledAfterFirstUseBecomesNil", + Case: func(ctx context.Context, t *testing.T) { + ins, ch := testChannel(1) + + ins.In <- Envelope{From: "kip", To: "merlin"} + iter := ch.Receive(ctx) + require.NotNil(t, iter) + + require.True(t, iter.Next(ctx)) + + res := iter.Envelope() + require.EqualValues(t, "kip", res.From) + require.EqualValues(t, "merlin", res.To) + + cctx, ccancel := context.WithCancel(ctx) + ccancel() + + require.False(t, iter.Next(cctx)) + require.Nil(t, iter.Envelope()) + }, + }, + { + Name: "IteratorMultipleNextCalls", + Case: func(ctx context.Context, t *testing.T) { + ins, ch := testChannel(1) + + ins.In <- Envelope{From: "kip", To: "merlin"} + iter := ch.Receive(ctx) + require.NotNil(t, iter) + + require.True(t, iter.Next(ctx)) + + res := iter.Envelope() + require.EqualValues(t, "kip", res.From) + require.EqualValues(t, "merlin", res.To) + + res1 := iter.Envelope() + require.Equal(t, res, res1) + }, + }, + { + Name: "IteratorProducesNilObjectBeforeNext", + Case: func(ctx context.Context, t *testing.T) { + ins, ch := testChannel(1) + + iter := ch.Receive(ctx) + require.NotNil(t, iter) + require.Nil(t, iter.Envelope()) + + ins.In <- Envelope{From: "kip", To: "merlin"} + require.NotNil(t, iter) + require.True(t, iter.Next(ctx)) + + res := iter.Envelope() + require.NotNil(t, res) + require.EqualValues(t, "kip", res.From) + require.EqualValues(t, "merlin", res.To) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + + ctx, cancel := context.WithCancel(bctx) + defer cancel() + + tc.Case(ctx, t) + }) + } +} diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 09397e974..10f996442 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -21,81 +21,6 @@ import ( const queueBufferDefault = 32 -// Envelope contains a message with sender/receiver routing info. -type Envelope struct { - From types.NodeID // sender (empty if outbound) - To types.NodeID // receiver (empty if inbound) - Broadcast bool // send to all connected peers (ignores To) - Message proto.Message // message payload - - // channelID is for internal Router use, set on outbound messages to inform - // the sendPeer() goroutine which transport channel to use. - // - // FIXME: If we migrate the Transport API to a byte-oriented multi-stream - // API, this will no longer be necessary since each channel will be mapped - // onto a stream during channel/peer setup. See: - // https://github.com/tendermint/spec/pull/227 - channelID ChannelID -} - -// PeerError is a peer error reported via Channel.Error. -// -// FIXME: This currently just disconnects the peer, which is too simplistic. -// For example, some errors should be logged, some should cause disconnects, -// and some should ban the peer. -// -// FIXME: This should probably be replaced by a more general PeerBehavior -// concept that can mark good and bad behavior and contributes to peer scoring. -// It should possibly also allow reactors to request explicit actions, e.g. -// disconnection or banning, in addition to doing this based on aggregates. -type PeerError struct { - NodeID types.NodeID - Err error -} - -// Channel is a bidirectional channel to exchange Protobuf messages with peers, -// wrapped in Envelope to specify routing info (i.e. sender/receiver). -type Channel struct { - ID ChannelID - In <-chan Envelope // inbound messages (peers to reactors) - Out chan<- Envelope // outbound messages (reactors to peers) - Error chan<- PeerError // peer error reporting - - messageType proto.Message // the channel's message type, used for unmarshaling -} - -// NewChannel creates a new channel. It is primarily for internal and test -// use, reactors should use Router.OpenChannel(). -func NewChannel( - id ChannelID, - messageType proto.Message, - inCh <-chan Envelope, - outCh chan<- Envelope, - errCh chan<- PeerError, -) *Channel { - return &Channel{ - ID: id, - messageType: messageType, - In: inCh, - Out: outCh, - Error: errCh, - } -} - -// Wrapper is a Protobuf message that can contain a variety of inner messages -// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the -// Router will automatically wrap outbound messages and unwrap inbound messages, -// such that reactors do not have to do this themselves. -type Wrapper interface { - proto.Message - - // Wrap will take a message and wrap it in this one if possible. - Wrap(proto.Message) error - - // Unwrap will unwrap the inner message contained in this message. - Unwrap() (proto.Message, error) -} - // RouterOptions specifies options for a Router. type RouterOptions struct { // ResolveTimeout is the timeout for resolving NodeAddress URLs.