diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index fd9bf4d7a..0bf0561d3 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -209,7 +209,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -223,7 +223,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) - switch chID { + switch envelope.ChannelID { case BlockSyncChannel: switch msg := envelope.Message.(type) { case *bcproto.BlockRequest: @@ -260,7 +260,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop } default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) } return err @@ -275,12 +275,12 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann iter := blockSyncCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, blockSyncCh.ID, envelope, blockSyncCh); err != nil { + if err := r.handleMessage(ctx, envelope, blockSyncCh); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } - r.logger.Error("failed to process message", "ch_id", blockSyncCh.ID, "envelope", envelope, "err", err) + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 53e0b540d..eea74b5e1 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -1266,7 +1266,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.En // the p2p channel. // // NOTE: We block on consensus state for proposals, block parts, and votes. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, chans channelBundle) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans channelBundle) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -1284,18 +1284,19 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop // and because a large part of the core business logic depends on these // domain types opposed to simply working with the Proto types. protoMsg := new(tmcons.Message) - if err := protoMsg.Wrap(envelope.Message); err != nil { + if err = protoMsg.Wrap(envelope.Message); err != nil { return err } - msgI, err := MsgFromProto(protoMsg) + var msgI Message + msgI, err = MsgFromProto(protoMsg) if err != nil { return err } - r.logger.Debug("received message", "ch_id", chID, "message", msgI, "peer", envelope.From) + r.logger.Debug("received message", "ch_id", envelope.ChannelID, "message", msgI, "peer", envelope.From) - switch chID { + switch envelope.ChannelID { case StateChannel: err = r.handleStateMessage(ctx, envelope, msgI, chans.votSet) case DataChannel: @@ -1305,7 +1306,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop case VoteSetBitsChannel: err = r.handleVoteSetBitsMessage(ctx, envelope, msgI) default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) } return err @@ -1320,8 +1321,8 @@ func (r *Reactor) processStateCh(ctx context.Context, chans channelBundle) { iter := chans.state.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, chans.state.ID, envelope, chans); err != nil { - r.logger.Error("failed to process message", "ch_id", chans.state.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope, chans); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := chans.state.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, @@ -1341,8 +1342,8 @@ func (r *Reactor) processDataCh(ctx context.Context, chans channelBundle) { iter := chans.data.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, chans.data.ID, envelope, chans); err != nil { - r.logger.Error("failed to process message", "ch_id", chans.data.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope, chans); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := chans.data.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, @@ -1362,8 +1363,8 @@ func (r *Reactor) processVoteCh(ctx context.Context, chans channelBundle) { iter := chans.vote.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, chans.vote.ID, envelope, chans); err != nil { - r.logger.Error("failed to process message", "ch_id", chans.vote.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope, chans); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := chans.vote.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, @@ -1384,12 +1385,12 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context, chans channelBundle) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, chans.votSet.ID, envelope, chans); err != nil { + if err := r.handleMessage(ctx, envelope, chans); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } - r.logger.Error("failed to process message", "ch_id", chans.votSet.ID, "envelope", envelope, "err", err) + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := chans.votSet.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 4809df32e..1d952d30e 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -133,7 +133,7 @@ func (r *Reactor) handleEvidenceMessage(ctx context.Context, envelope *p2p.Envel // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -147,15 +147,14 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) - switch chID { + switch envelope.ChannelID { case EvidenceChannel: err = r.handleEvidenceMessage(ctx, envelope) - default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) } - return err + return } // processEvidenceCh implements a blocking event loop where we listen for p2p @@ -164,8 +163,8 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel iter := evidenceCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, evidenceCh.ID, envelope); err != nil { - r.logger.Error("failed to process message", "ch_id", evidenceCh.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := evidenceCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 8f83f5006..ae578e70a 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -169,7 +169,7 @@ func (r *Reactor) handleMempoolMessage(ctx context.Context, envelope *p2p.Envelo // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) { defer func() { if e := recover(); e != nil { r.observePanic(e) @@ -184,15 +184,14 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Debug("received message", "peer", envelope.From) - switch chID { + switch envelope.ChannelID { case MempoolChannel: err = r.handleMempoolMessage(ctx, envelope) - default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", chID, envelope.Message) + err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", envelope.ChannelID, envelope.Message) } - return err + return } // processMempoolCh implements a blocking event loop where we listen for p2p @@ -201,8 +200,8 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) iter := mempoolCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, mempoolCh.ID, envelope); err != nil { - r.logger.Error("failed to process message", "ch_id", mempoolCh.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := mempoolCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index f9f3ec40e..885e080d4 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -123,7 +123,7 @@ func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerE err := channel.SendError(tctx, peerError) switch { case errors.Is(err, context.DeadlineExceeded): - require.Fail(t, "timed out reporting error", "%v on %v", peerError, channel.ID) + require.Fail(t, "timed out reporting error", "%v for %q", peerError, channel.String()) default: require.NoError(t, err, "unexpected error") } diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 1c80763ee..bd4737326 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -193,7 +193,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) { dur, err := r.handlePexMessage(ctx, envelope, pexCh) if err != nil { r.logger.Error("failed to process message", - "ch_id", pexCh.ID, "envelope", envelope, "err", err) + "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := pexCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index fb2749a02..f4d72d017 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -305,7 +305,12 @@ func (r *Reactor) OnStart(ctx context.Context) error { return nil } - go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh) + go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{ + SnapshotChannel: snapshotCh, + ChunkChannel: chunkCh, + LightBlockChannel: blockCh, + ParamsChannel: paramsCh, + }) go r.processPeerUpdates(ctx, r.peerEvents(ctx)) if r.needsStateSync { @@ -661,7 +666,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel "failed to add snapshot", "height", msg.Height, "format", msg.Format, - "channel", snapshotCh.ID, + "channel", envelope.ChannelID, "err", err, ) return nil @@ -907,15 +912,14 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha // encountered during message execution will result in a PeerError being sent on // the respective channel. When the reactor is stopped, we will catch the signal // and close the p2p Channel gracefully. -func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) { +func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) { // make sure that the iterator gets cleaned up in case of error ctx, cancel := context.WithCancel(ctx) defer cancel() - chanTable := make(map[p2p.ChannelID]*p2p.Channel, len(chs)) - for idx := range chs { - ch := chs[idx] - chanTable[ch.ID] = ch + chs := make([]*p2p.Channel, 0, len(chanTable)) + for key := range chanTable { + chs = append(chs, chanTable[key]) } iter := p2p.MergedChannelIterator(ctx, chs...)