mirror of
https://github.com/tendermint/tendermint.git
synced 2026-04-16 13:47:07 +00:00
p2p: avoid using p2p.Channel internals (#8444)
This commit is contained in:
@@ -209,7 +209,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
@@ -223,7 +223,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
|
||||
|
||||
r.logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
|
||||
|
||||
switch chID {
|
||||
switch envelope.ChannelID {
|
||||
case BlockSyncChannel:
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *bcproto.BlockRequest:
|
||||
@@ -260,7 +260,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
|
||||
}
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -275,12 +275,12 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann
|
||||
iter := blockSyncCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
if err := r.handleMessage(ctx, blockSyncCh.ID, envelope, blockSyncCh); err != nil {
|
||||
if err := r.handleMessage(ctx, envelope, blockSyncCh); err != nil {
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
}
|
||||
|
||||
r.logger.Error("failed to process message", "ch_id", blockSyncCh.ID, "envelope", envelope, "err", err)
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
|
||||
@@ -1266,7 +1266,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.En
|
||||
// the p2p channel.
|
||||
//
|
||||
// NOTE: We block on consensus state for proposals, block parts, and votes.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, chans channelBundle) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans channelBundle) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
@@ -1284,18 +1284,19 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
|
||||
// and because a large part of the core business logic depends on these
|
||||
// domain types opposed to simply working with the Proto types.
|
||||
protoMsg := new(tmcons.Message)
|
||||
if err := protoMsg.Wrap(envelope.Message); err != nil {
|
||||
if err = protoMsg.Wrap(envelope.Message); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgI, err := MsgFromProto(protoMsg)
|
||||
var msgI Message
|
||||
msgI, err = MsgFromProto(protoMsg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.logger.Debug("received message", "ch_id", chID, "message", msgI, "peer", envelope.From)
|
||||
r.logger.Debug("received message", "ch_id", envelope.ChannelID, "message", msgI, "peer", envelope.From)
|
||||
|
||||
switch chID {
|
||||
switch envelope.ChannelID {
|
||||
case StateChannel:
|
||||
err = r.handleStateMessage(ctx, envelope, msgI, chans.votSet)
|
||||
case DataChannel:
|
||||
@@ -1305,7 +1306,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
|
||||
case VoteSetBitsChannel:
|
||||
err = r.handleVoteSetBitsMessage(ctx, envelope, msgI)
|
||||
default:
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -1320,8 +1321,8 @@ func (r *Reactor) processStateCh(ctx context.Context, chans channelBundle) {
|
||||
iter := chans.state.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
if err := r.handleMessage(ctx, chans.state.ID, envelope, chans); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", chans.state.ID, "envelope", envelope, "err", err)
|
||||
if err := r.handleMessage(ctx, envelope, chans); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := chans.state.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
@@ -1341,8 +1342,8 @@ func (r *Reactor) processDataCh(ctx context.Context, chans channelBundle) {
|
||||
iter := chans.data.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
if err := r.handleMessage(ctx, chans.data.ID, envelope, chans); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", chans.data.ID, "envelope", envelope, "err", err)
|
||||
if err := r.handleMessage(ctx, envelope, chans); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := chans.data.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
@@ -1362,8 +1363,8 @@ func (r *Reactor) processVoteCh(ctx context.Context, chans channelBundle) {
|
||||
iter := chans.vote.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
if err := r.handleMessage(ctx, chans.vote.ID, envelope, chans); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", chans.vote.ID, "envelope", envelope, "err", err)
|
||||
if err := r.handleMessage(ctx, envelope, chans); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := chans.vote.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
@@ -1384,12 +1385,12 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context, chans channelBundle)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
|
||||
if err := r.handleMessage(ctx, chans.votSet.ID, envelope, chans); err != nil {
|
||||
if err := r.handleMessage(ctx, envelope, chans); err != nil {
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
}
|
||||
|
||||
r.logger.Error("failed to process message", "ch_id", chans.votSet.ID, "envelope", envelope, "err", err)
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := chans.votSet.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
|
||||
@@ -133,7 +133,7 @@ func (r *Reactor) handleEvidenceMessage(ctx context.Context, envelope *p2p.Envel
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
@@ -147,15 +147,14 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
|
||||
|
||||
r.logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
|
||||
|
||||
switch chID {
|
||||
switch envelope.ChannelID {
|
||||
case EvidenceChannel:
|
||||
err = r.handleEvidenceMessage(ctx, envelope)
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
|
||||
}
|
||||
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
// processEvidenceCh implements a blocking event loop where we listen for p2p
|
||||
@@ -164,8 +163,8 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel
|
||||
iter := evidenceCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
if err := r.handleMessage(ctx, evidenceCh.ID, envelope); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", evidenceCh.ID, "envelope", envelope, "err", err)
|
||||
if err := r.handleMessage(ctx, envelope); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := evidenceCh.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
|
||||
@@ -169,7 +169,7 @@ func (r *Reactor) handleMempoolMessage(ctx context.Context, envelope *p2p.Envelo
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
r.observePanic(e)
|
||||
@@ -184,15 +184,14 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
|
||||
|
||||
r.logger.Debug("received message", "peer", envelope.From)
|
||||
|
||||
switch chID {
|
||||
switch envelope.ChannelID {
|
||||
case MempoolChannel:
|
||||
err = r.handleMempoolMessage(ctx, envelope)
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", chID, envelope.Message)
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", envelope.ChannelID, envelope.Message)
|
||||
}
|
||||
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
// processMempoolCh implements a blocking event loop where we listen for p2p
|
||||
@@ -201,8 +200,8 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel)
|
||||
iter := mempoolCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
if err := r.handleMessage(ctx, mempoolCh.ID, envelope); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", mempoolCh.ID, "envelope", envelope, "err", err)
|
||||
if err := r.handleMessage(ctx, envelope); err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := mempoolCh.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
|
||||
@@ -123,7 +123,7 @@ func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerE
|
||||
err := channel.SendError(tctx, peerError)
|
||||
switch {
|
||||
case errors.Is(err, context.DeadlineExceeded):
|
||||
require.Fail(t, "timed out reporting error", "%v on %v", peerError, channel.ID)
|
||||
require.Fail(t, "timed out reporting error", "%v for %q", peerError, channel.String())
|
||||
default:
|
||||
require.NoError(t, err, "unexpected error")
|
||||
}
|
||||
|
||||
@@ -193,7 +193,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
|
||||
dur, err := r.handlePexMessage(ctx, envelope, pexCh)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to process message",
|
||||
"ch_id", pexCh.ID, "envelope", envelope, "err", err)
|
||||
"ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := pexCh.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
|
||||
@@ -305,7 +305,12 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh)
|
||||
go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{
|
||||
SnapshotChannel: snapshotCh,
|
||||
ChunkChannel: chunkCh,
|
||||
LightBlockChannel: blockCh,
|
||||
ParamsChannel: paramsCh,
|
||||
})
|
||||
go r.processPeerUpdates(ctx, r.peerEvents(ctx))
|
||||
|
||||
if r.needsStateSync {
|
||||
@@ -661,7 +666,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
|
||||
"failed to add snapshot",
|
||||
"height", msg.Height,
|
||||
"format", msg.Format,
|
||||
"channel", snapshotCh.ID,
|
||||
"channel", envelope.ChannelID,
|
||||
"err", err,
|
||||
)
|
||||
return nil
|
||||
@@ -907,15 +912,14 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
|
||||
// encountered during message execution will result in a PeerError being sent on
|
||||
// the respective channel. When the reactor is stopped, we will catch the signal
|
||||
// and close the p2p Channel gracefully.
|
||||
func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) {
|
||||
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) {
|
||||
// make sure that the iterator gets cleaned up in case of error
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
chanTable := make(map[p2p.ChannelID]*p2p.Channel, len(chs))
|
||||
for idx := range chs {
|
||||
ch := chs[idx]
|
||||
chanTable[ch.ID] = ch
|
||||
chs := make([]*p2p.Channel, 0, len(chanTable))
|
||||
for key := range chanTable {
|
||||
chs = append(chs, chanTable[key])
|
||||
}
|
||||
|
||||
iter := p2p.MergedChannelIterator(ctx, chs...)
|
||||
|
||||
Reference in New Issue
Block a user