Merge changes from master and resolve conflicts

Signed-off-by: Thane Thomson <connect@thanethomson.com>
This commit is contained in:
Thane Thomson
2022-04-30 14:37:13 -04:00
8 changed files with 69 additions and 59 deletions

View File

@@ -28,7 +28,7 @@ eg, L = latency = 0.1s
*/
const (
requestIntervalMS = 2
requestInterval = 2 * time.Millisecond
maxTotalRequesters = 600
maxPeerErrBuffer = 1000
maxPendingRequests = maxTotalRequesters
@@ -131,22 +131,22 @@ func (*BlockPool) OnStop() {}
// spawns requesters as needed
func (pool *BlockPool) makeRequestersRoutine(ctx context.Context) {
for pool.IsRunning() {
_, numPending, lenRequesters := pool.GetStatus()
switch {
case numPending >= maxPendingRequests:
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
case lenRequesters >= maxTotalRequesters:
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
default:
// request for more blocks.
pool.makeNextRequester(ctx)
if ctx.Err() != nil {
return
}
_, numPending, lenRequesters := pool.GetStatus()
if numPending >= maxPendingRequests || lenRequesters >= maxTotalRequesters {
// This is preferable to using a timer because the request interval
// is so small. Larger request intervals may necessitate using a
// timer/ticker.
time.Sleep(requestInterval)
pool.removeTimedoutPeers()
continue
}
// request for more blocks.
pool.makeNextRequester(ctx)
}
}
@@ -655,9 +655,16 @@ OUTER_LOOP:
if !bpr.IsRunning() || !bpr.pool.IsRunning() {
return
}
if ctx.Err() != nil {
return
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
time.Sleep(requestIntervalMS * time.Millisecond)
// This is preferable to using a timer because the request
// interval is so small. Larger request intervals may
// necessitate using a timer/ticker.
time.Sleep(requestInterval)
continue PICK_PEER_LOOP
}
break PICK_PEER_LOOP

View File

@@ -218,7 +218,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -232,7 +232,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
r.logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
switch chID {
switch envelope.ChannelID {
case BlockSyncChannel:
switch msg := envelope.Message.(type) {
case *bcproto.BlockRequest:
@@ -276,7 +276,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
}
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
}
return err
@@ -291,12 +291,12 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann
iter := blockSyncCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, blockSyncCh.ID, envelope, blockSyncCh); err != nil {
if err := r.handleMessage(ctx, envelope, blockSyncCh); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.logger.Error("failed to process message", "ch_id", blockSyncCh.ID, "envelope", envelope, "err", err)
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,

View File

@@ -1266,7 +1266,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.En
// the p2p channel.
//
// NOTE: We block on consensus state for proposals, block parts, and votes.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, chans channelBundle) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans channelBundle) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -1284,18 +1284,19 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// and because a large part of the core business logic depends on these
// domain types opposed to simply working with the Proto types.
protoMsg := new(tmcons.Message)
if err := protoMsg.Wrap(envelope.Message); err != nil {
if err = protoMsg.Wrap(envelope.Message); err != nil {
return err
}
msgI, err := MsgFromProto(protoMsg)
var msgI Message
msgI, err = MsgFromProto(protoMsg)
if err != nil {
return err
}
r.logger.Debug("received message", "ch_id", chID, "message", msgI, "peer", envelope.From)
r.logger.Debug("received message", "ch_id", envelope.ChannelID, "message", msgI, "peer", envelope.From)
switch chID {
switch envelope.ChannelID {
case StateChannel:
err = r.handleStateMessage(ctx, envelope, msgI, chans.votSet)
case DataChannel:
@@ -1305,7 +1306,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
case VoteSetBitsChannel:
err = r.handleVoteSetBitsMessage(ctx, envelope, msgI)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
}
return err
@@ -1320,8 +1321,8 @@ func (r *Reactor) processStateCh(ctx context.Context, chans channelBundle) {
iter := chans.state.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, chans.state.ID, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", chans.state.ID, "envelope", envelope, "err", err)
if err := r.handleMessage(ctx, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := chans.state.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
@@ -1341,8 +1342,8 @@ func (r *Reactor) processDataCh(ctx context.Context, chans channelBundle) {
iter := chans.data.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, chans.data.ID, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", chans.data.ID, "envelope", envelope, "err", err)
if err := r.handleMessage(ctx, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := chans.data.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
@@ -1362,8 +1363,8 @@ func (r *Reactor) processVoteCh(ctx context.Context, chans channelBundle) {
iter := chans.vote.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, chans.vote.ID, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", chans.vote.ID, "envelope", envelope, "err", err)
if err := r.handleMessage(ctx, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := chans.vote.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
@@ -1384,12 +1385,12 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context, chans channelBundle)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, chans.votSet.ID, envelope, chans); err != nil {
if err := r.handleMessage(ctx, envelope, chans); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.logger.Error("failed to process message", "ch_id", chans.votSet.ID, "envelope", envelope, "err", err)
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := chans.votSet.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,

View File

@@ -133,7 +133,7 @@ func (r *Reactor) handleEvidenceMessage(ctx context.Context, envelope *p2p.Envel
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -147,15 +147,14 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
r.logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
switch chID {
switch envelope.ChannelID {
case EvidenceChannel:
err = r.handleEvidenceMessage(ctx, envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
}
return err
return
}
// processEvidenceCh implements a blocking event loop where we listen for p2p
@@ -164,8 +163,8 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel
iter := evidenceCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, evidenceCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", evidenceCh.ID, "envelope", envelope, "err", err)
if err := r.handleMessage(ctx, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := evidenceCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,

View File

@@ -169,7 +169,7 @@ func (r *Reactor) handleMempoolMessage(ctx context.Context, envelope *p2p.Envelo
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) {
defer func() {
if e := recover(); e != nil {
r.observePanic(e)
@@ -184,15 +184,14 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
r.logger.Debug("received message", "peer", envelope.From)
switch chID {
switch envelope.ChannelID {
case MempoolChannel:
err = r.handleMempoolMessage(ctx, envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", chID, envelope.Message)
err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", envelope.ChannelID, envelope.Message)
}
return err
return
}
// processMempoolCh implements a blocking event loop where we listen for p2p
@@ -201,8 +200,8 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel)
iter := mempoolCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, mempoolCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", mempoolCh.ID, "envelope", envelope, "err", err)
if err := r.handleMessage(ctx, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := mempoolCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,

View File

@@ -123,7 +123,7 @@ func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerE
err := channel.SendError(tctx, peerError)
switch {
case errors.Is(err, context.DeadlineExceeded):
require.Fail(t, "timed out reporting error", "%v on %v", peerError, channel.ID)
require.Fail(t, "timed out reporting error", "%v for %q", peerError, channel.String())
default:
require.NoError(t, err, "unexpected error")
}

View File

@@ -193,7 +193,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
dur, err := r.handlePexMessage(ctx, envelope, pexCh)
if err != nil {
r.logger.Error("failed to process message",
"ch_id", pexCh.ID, "envelope", envelope, "err", err)
"ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := pexCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,

View File

@@ -305,7 +305,12 @@ func (r *Reactor) OnStart(ctx context.Context) error {
return nil
}
go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh)
go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{
SnapshotChannel: snapshotCh,
ChunkChannel: chunkCh,
LightBlockChannel: blockCh,
ParamsChannel: paramsCh,
})
go r.processPeerUpdates(ctx, r.peerEvents(ctx))
if r.needsStateSync {
@@ -661,7 +666,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
"failed to add snapshot",
"height", msg.Height,
"format", msg.Format,
"channel", snapshotCh.ID,
"channel", envelope.ChannelID,
"err", err,
)
return nil
@@ -907,15 +912,14 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
// encountered during message execution will result in a PeerError being sent on
// the respective channel. When the reactor is stopped, we will catch the signal
// and close the p2p Channel gracefully.
func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) {
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) {
// make sure that the iterator gets cleaned up in case of error
ctx, cancel := context.WithCancel(ctx)
defer cancel()
chanTable := make(map[p2p.ChannelID]*p2p.Channel, len(chs))
for idx := range chs {
ch := chs[idx]
chanTable[ch.ID] = ch
chs := make([]*p2p.Channel, 0, len(chanTable))
for key := range chanTable {
chs = append(chs, chanTable[key])
}
iter := p2p.MergedChannelIterator(ctx, chs...)