diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index eb96737ca..d60ec427d 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -37,6 +37,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [mempool] \#5673 Cancel `CheckTx` requests if RPC client disconnects or times out (@melekes) - [abci] \#5706 Added `AbciVersion` to `RequestInfo` allowing applications to check ABCI version when connecting to Tendermint. (@marbar3778) - [blockchain/v1] \#5728 Remove in favor of v2 (@melekes) +- [blockchain/v0] \#5741 Relax termination conditions and increase sync timeout (@melekes) ### BUG FIXES diff --git a/blockchain/doc.go b/blockchain/doc.go new file mode 100644 index 000000000..b5d9858d0 --- /dev/null +++ b/blockchain/doc.go @@ -0,0 +1,17 @@ +/* +Package blockchain provides two implementations of the fast-sync protocol. + +- v0 was the very first implementation. it's battle tested, but does not have a +lot of test coverage. +- v2 is the newest implementation, with a focus on testability and readability. + +Check out ADR-40 for the formal model and requirements. + +# Termination criteria + +1. the maximum peer height is reached +2. termination timeout is triggered, which is set if the peer set is empty or +there are no pending requests. + +*/ +package blockchain diff --git a/blockchain/v0/pool.go b/blockchain/v0/pool.go index 69e0b55c4..7d61438a0 100644 --- a/blockchain/v0/pool.go +++ b/blockchain/v0/pool.go @@ -58,12 +58,19 @@ var peerTimeout = 15 * time.Second // not const so we can override with tests are not at peer limits, we can probably switch to consensus reactor */ +// BlockRequest stores a block request identified by the block Height and the +// PeerID responsible for delivering the block. +type BlockRequest struct { + Height int64 + PeerID p2p.ID +} + // BlockPool keeps track of the fast sync peers, block requests and block responses. type BlockPool struct { service.BaseService - startTime time.Time + lastAdvance time.Time - mtx tmsync.Mutex + mtx tmsync.RWMutex // block requests requesters map[int64]*bpRequester height int64 // the lowest key in requesters. @@ -98,8 +105,8 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p // OnStart implements service.Service by spawning requesters routine and recording // pool's start time. func (pool *BlockPool) OnStart() error { + pool.lastAdvance = time.Now() go pool.makeRequestersRoutine() - pool.startTime = time.Now() return nil } @@ -134,6 +141,7 @@ func (pool *BlockPool) removeTimedoutPeers() { defer pool.mtx.Unlock() for _, peer := range pool.peers { + // check if peer timed out if !peer.didTimeout && peer.numPending > 0 { curRate := peer.recvMonitor.Status().CurRate // curRate can be 0 on start @@ -147,6 +155,7 @@ func (pool *BlockPool) removeTimedoutPeers() { peer.didTimeout = true } } + if peer.didTimeout { pool.removePeer(peer.id) } @@ -156,33 +165,24 @@ func (pool *BlockPool) removeTimedoutPeers() { // GetStatus returns pool's height, numPending requests and the number of // requesters. func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) { - pool.mtx.Lock() - defer pool.mtx.Unlock() + pool.mtx.RLock() + defer pool.mtx.RUnlock() return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) } // IsCaughtUp returns true if this node is caught up, false - otherwise. -// TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() bool { - pool.mtx.Lock() - defer pool.mtx.Unlock() + pool.mtx.RLock() + defer pool.mtx.RUnlock() // Need at least 1 peer to be considered caught up. if len(pool.peers) == 0 { - pool.Logger.Debug("Blockpool has no peers") return false } - - // Some conditions to determine if we're caught up. - // Ensures we've either received a block or waited some amount of time, - // and that we're synced to the highest known height. - // Note we use maxPeerHeight - 1 because to sync block H requires block H+1 + // NOTE: we use maxPeerHeight - 1 because to sync block H requires block H+1 // to verify the LastCommit. - receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second - ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1) - isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers - return isCaughtUp + return pool.height >= (pool.maxPeerHeight - 1) } // PeekTwoBlocks returns blocks at pool.height and pool.height+1. @@ -190,8 +190,8 @@ func (pool *BlockPool) IsCaughtUp() bool { // So we peek two blocks at a time. // The caller will verify the commit. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { - pool.mtx.Lock() - defer pool.mtx.Unlock() + pool.mtx.RLock() + defer pool.mtx.RUnlock() if r := pool.requesters[pool.height]; r != nil { first = r.getBlock() @@ -209,16 +209,12 @@ func (pool *BlockPool) PopRequest() { defer pool.mtx.Unlock() if r := pool.requesters[pool.height]; r != nil { - /* The block can disappear at any time, due to removePeer(). - if r := pool.requesters[pool.height]; r == nil || r.block == nil { - PanicSanity("PopRequest() requires a valid block") - } - */ if err := r.Stop(); err != nil { pool.Logger.Error("Error stopping requester", "err", err) } delete(pool.requesters, pool.height) pool.height++ + pool.lastAdvance = time.Now() } else { panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) } @@ -248,14 +244,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int requester := pool.requesters[block.Height] if requester == nil { - pool.Logger.Info( - "peer sent us a block we didn't expect", - "peer", - peerID, - "curHeight", - pool.height, - "blockHeight", - block.Height) + pool.Logger.Error("peer sent us a block we didn't expect", + "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height) diff := pool.height - block.Height if diff < 0 { diff *= -1 @@ -273,18 +263,27 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int peer.decrPending(blockSize) } } else { - pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height) - pool.sendError(errors.New("invalid peer"), peerID) + err := errors.New("requester is different or block already exists") + pool.Logger.Error(err.Error(), "peer", peerID, "requester", requester.getPeerID(), "blockHeight", block.Height) + pool.sendError(err, peerID) } } // MaxPeerHeight returns the highest reported height. func (pool *BlockPool) MaxPeerHeight() int64 { - pool.mtx.Lock() - defer pool.mtx.Unlock() + pool.mtx.RLock() + defer pool.mtx.RUnlock() return pool.maxPeerHeight } +// LastAdvance returns the time when the last block was processed (or start +// time if no blocks were processed). +func (pool *BlockPool) LastAdvance() time.Time { + pool.mtx.RLock() + defer pool.mtx.RUnlock() + return pool.lastAdvance +} + // SetPeerRange sets the peer's alleged blockchain base and height. func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) { pool.mtx.Lock() @@ -601,7 +600,6 @@ OUTER_LOOP: } peer = bpr.pool.pickIncrAvailablePeer(bpr.height) if peer == nil { - // log.Info("No peers available", "height", height) time.Sleep(requestIntervalMS * time.Millisecond) continue PICK_PEER_LOOP } @@ -638,10 +636,3 @@ OUTER_LOOP: } } } - -// BlockRequest stores a block request identified by the block Height and the PeerID responsible for -// delivering the block -type BlockRequest struct { - Height int64 - PeerID p2p.ID -} diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 0ffd14194..77f112180 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -28,6 +28,9 @@ const ( statusUpdateIntervalSeconds = 10 // check if we should switch to consensus reactor switchToConsensusIntervalSeconds = 1 + + // switch to consensus after this duration of inactivity + syncTimeout = 60 * time.Second ) type consensusReactor interface { @@ -158,7 +161,7 @@ func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { return } - peer.Send(BlockchainChannel, msgBytes) + _ = peer.Send(BlockchainChannel, msgBytes) // it's OK if send fails. will try later in poolRoutine // peer is added to the pool once we receive the first @@ -207,20 +210,22 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest, // XXX: do not call any methods that can block or incur heavy processing. // https://github.com/tendermint/tendermint/issues/2888 func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + logger := bcR.Logger.With("src", src, "chId", chID) + msg, err := bc.DecodeMsg(msgBytes) if err != nil { - bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) + logger.Error("Error decoding message", "err", err) bcR.Switch.StopPeerForError(src, err) return } if err = bc.ValidateMsg(msg); err != nil { - bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) + logger.Error("Peer sent us invalid msg", "msg", msg, "err", err) bcR.Switch.StopPeerForError(src, err) return } - bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg) + logger.Debug("Receive", "msg", msg) switch msg := msg.(type) { case *bcproto.BlockRequest: @@ -228,7 +233,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) case *bcproto.BlockResponse: bi, err := types.BlockFromProto(msg.Block) if err != nil { - bcR.Logger.Error("Block content is invalid", "err", err) + logger.Error("Block content is invalid", "err", err) bcR.Switch.StopPeerForError(src, err) return } @@ -240,7 +245,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) Base: bcR.store.Base(), }) if err != nil { - bcR.Logger.Error("could not convert msg to protobut", "err", err) + logger.Error("could not convert msg to protobut", "err", err) return } src.TrySend(BlockchainChannel, msgBytes) @@ -248,45 +253,50 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) // Got a peer status. Unverified. bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height) case *bcproto.NoBlockResponse: - bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height) + logger.Debug("Peer does not have requested block", "height", msg.Height) default: - bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } } // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) { + var ( + trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond) + statusUpdateTicker = time.NewTicker(statusUpdateIntervalSeconds * time.Second) + switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second) - trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) - statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) - switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) + blocksSynced = uint64(0) - blocksSynced := uint64(0) + chainID = bcR.initialState.ChainID + state = bcR.initialState - chainID := bcR.initialState.ChainID - state := bcR.initialState + lastHundred = time.Now() + lastRate = 0.0 - lastHundred := time.Now() - lastRate := 0.0 - - didProcessCh := make(chan struct{}, 1) + didProcessCh = make(chan struct{}, 1) + ) go func() { for { select { + case <-bcR.Quit(): return + case <-bcR.pool.Quit(): return + case request := <-bcR.requestsCh: peer := bcR.Switch.Peers().Get(request.PeerID) if peer == nil { + bcR.Logger.Debug("Can't send request: no peer", "peer_id", request.PeerID) continue } msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: request.Height}) if err != nil { - bcR.Logger.Error("could not convert msg to proto", "err", err) + bcR.Logger.Error("could not convert BlockRequest to proto", "err", err) continue } @@ -294,6 +304,7 @@ func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) { if !queued { bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height) } + case err := <-bcR.errorsCh: peer := bcR.Switch.Peers().Get(err.peerID) if peer != nil { @@ -302,8 +313,7 @@ func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) { case <-statusUpdateTicker.C: // ask for status updates - go bcR.BroadcastStatusRequest() // nolint: errcheck - + go bcR.BroadcastStatusRequest() } } }() @@ -311,27 +321,41 @@ func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) { FOR_LOOP: for { select { - case <-switchToConsensusTicker.C: - height, numPending, lenRequesters := bcR.pool.GetStatus() - outbound, inbound, _ := bcR.Switch.NumPeers() - bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters, - "outbound", outbound, "inbound", inbound) - if bcR.pool.IsCaughtUp() { - bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) - if err := bcR.pool.Stop(); err != nil { - bcR.Logger.Error("Error stopping pool", "err", err) - } - conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - if ok { - conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) - } - // else { - // should only happen during testing - // } - break FOR_LOOP + case <-switchToConsensusTicker.C: + var ( + height, numPending, lenRequesters = bcR.pool.GetStatus() + outbound, inbound, _ = bcR.Switch.NumPeers() + lastAdvance = bcR.pool.LastAdvance() + ) + + bcR.Logger.Debug("Consensus ticker", + "numPending", numPending, + "total", lenRequesters) + + switch { + case bcR.pool.IsCaughtUp(): + bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) + case time.Since(lastAdvance) > syncTimeout: + bcR.Logger.Error(fmt.Sprintf("No progress since last advance: %v", lastAdvance)) + default: + bcR.Logger.Info("Not caught up yet", + "height", height, "max_peer_height", bcR.pool.MaxPeerHeight(), + "num_peers", outbound+inbound, + "timeout_in", syncTimeout-time.Since(lastAdvance)) + continue } + if err := bcR.pool.Stop(); err != nil { + bcR.Logger.Error("Error stopping pool", "err", err) + } + conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) + if ok { + conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) + } + + break FOR_LOOP + case <-trySyncTicker.C: // chan time select { case didProcessCh <- struct{}{}: @@ -358,31 +382,37 @@ FOR_LOOP: didProcessCh <- struct{}{} } - firstParts := first.MakePartSet(types.BlockPartSizeBytes) - firstPartSetHeader := firstParts.Header() - firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader} + var ( + firstParts = first.MakePartSet(types.BlockPartSizeBytes) + firstPartSetHeader = firstParts.Header() + firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader} + ) + // Finally, verify the first block using the second's commit // NOTE: we can probably make this more efficient, but note that calling // first.Hash() doesn't verify the tx contents, so MakePartSet() is // currently necessary. - err := state.Validators.VerifyCommitLight( - chainID, firstID, first.Height, second.LastCommit) + err := state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit) if err != nil { - bcR.Logger.Error("Error in validation", "err", err) + err = fmt.Errorf("invalid last commit: %w", err) + bcR.Logger.Error(err.Error(), + "last_commit", second.LastCommit, "block_id", firstID, "height", first.Height) + peerID := bcR.pool.RedoRequest(first.Height) peer := bcR.Switch.Peers().Get(peerID) if peer != nil { - // NOTE: we've already removed the peer's request, but we - // still need to clean up the rest. - bcR.Switch.StopPeerForError(peer, fmt.Errorf("blockchainReactor validation error: %v", err)) + // NOTE: we've already removed the peer's request, but we still need + // to clean up the rest. + bcR.Switch.StopPeerForError(peer, err) } + peerID2 := bcR.pool.RedoRequest(second.Height) - peer2 := bcR.Switch.Peers().Get(peerID2) - if peer2 != nil && peer2 != peer { - // NOTE: we've already removed the peer's request, but we - // still need to clean up the rest. - bcR.Switch.StopPeerForError(peer2, fmt.Errorf("blockchainReactor validation error: %v", err)) + if peerID2 != peerID { + if peer2 := bcR.Switch.Peers().Get(peerID2); peer2 != nil { + bcR.Switch.StopPeerForError(peer2, err) + } } + continue FOR_LOOP } else { bcR.pool.PopRequest() @@ -390,8 +420,8 @@ FOR_LOOP: // TODO: batch saves so we dont persist to disk every block bcR.store.SaveBlock(first, firstParts, second.LastCommit) - // TODO: same thing for app - but we would need a way to - // get the hash without persisting the state + // TODO: same thing for app - but we would need a way to get the hash + // without persisting the state. var err error state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first) if err != nil { @@ -402,8 +432,8 @@ FOR_LOOP: if blocksSynced%100 == 0 { lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) - bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, - "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) + bcR.Logger.Info("Fast Sync Rate", + "height", bcR.pool.height, "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) lastHundred = time.Now() } } @@ -416,14 +446,13 @@ FOR_LOOP: } // BroadcastStatusRequest broadcasts `BlockStore` base and height. -func (bcR *BlockchainReactor) BroadcastStatusRequest() error { +func (bcR *BlockchainReactor) BroadcastStatusRequest() { bm, err := bc.EncodeMsg(&bcproto.StatusRequest{}) if err != nil { - bcR.Logger.Error("could not convert msg to proto", "err", err) - return fmt.Errorf("could not convert msg to proto: %w", err) + bcR.Logger.Error("could not convert StatusRequest to proto", "err", err) + return } - bcR.Switch.Broadcast(BlockchainChannel, bm) - - return nil + // We don't care about whenever broadcast is successful or not. + _ = bcR.Switch.Broadcast(BlockchainChannel, bm) } diff --git a/blockchain/v2/io.go b/blockchain/v2/io.go index 4951573ce..85d929968 100644 --- a/blockchain/v2/io.go +++ b/blockchain/v2/io.go @@ -1,7 +1,7 @@ package v2 import ( - "fmt" + "errors" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/p2p" @@ -10,6 +10,11 @@ import ( "github.com/tendermint/tendermint/types" ) +var ( + errPeerQueueFull = errors.New("peer queue full") + errNoPeer = errors.New("peer not found") +) + type iIO interface { sendBlockRequest(peerID p2p.ID, height int64) error sendBlockToPeer(block *types.Block, peerID p2p.ID) error @@ -45,7 +50,7 @@ type consensusReactor interface { func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error { peer := sio.sw.Peers().Get(peerID) if peer == nil { - return fmt.Errorf("peer not found") + return errNoPeer } msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height}) if err != nil { @@ -54,7 +59,7 @@ func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error { queued := peer.TrySend(BlockchainChannel, msgBytes) if !queued { - return fmt.Errorf("send queue full") + return errPeerQueueFull } return nil } @@ -62,7 +67,7 @@ func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error { func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID) error { peer := sio.sw.Peers().Get(peerID) if peer == nil { - return fmt.Errorf("peer not found") + return errNoPeer } msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base}) @@ -71,7 +76,7 @@ func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID) } if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { - return fmt.Errorf("peer queue full") + return errPeerQueueFull } return nil @@ -80,7 +85,7 @@ func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID) func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error { peer := sio.sw.Peers().Get(peerID) if peer == nil { - return fmt.Errorf("peer not found") + return errNoPeer } if block == nil { panic("trying to send nil block") @@ -96,7 +101,7 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error { return err } if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { - return fmt.Errorf("peer queue full") + return errPeerQueueFull } return nil @@ -105,7 +110,7 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error { func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error { peer := sio.sw.Peers().Get(peerID) if peer == nil { - return fmt.Errorf("peer not found") + return errNoPeer } msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: height}) if err != nil { @@ -113,7 +118,7 @@ func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error { } if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { - return fmt.Errorf("peer queue full") + return errPeerQueueFull } return nil diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index ecba2c39f..26be63ea7 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -47,11 +47,6 @@ type BlockchainReactor struct { store blockStore } -//nolint:unused,deadcode -type blockVerifier interface { - VerifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error -} - type blockApplier interface { ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, int64, error) } @@ -458,39 +453,40 @@ func (r *BlockchainReactor) Stop() error { // XXX: do not call any methods that can block or incur heavy processing. // https://github.com/tendermint/tendermint/issues/2888 func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + logger := r.logger.With("src", src.ID(), "chID", chID) + msg, err := bc.DecodeMsg(msgBytes) if err != nil { - r.logger.Error("error decoding message", - "src", src.ID(), "chId", chID, "msg", msg, "err", err) + logger.Error("error decoding message", "err", err) _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) return } if err = bc.ValidateMsg(msg); err != nil { - r.logger.Error("peer sent us invalid msg", "peer", src, "msg", msg, "err", err) + logger.Error("peer sent us invalid msg", "msg", msg, "err", err) _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) return } - r.logger.Debug("Receive", "src", src.ID(), "chID", chID, "msg", msg) + r.logger.Debug("Receive", "msg", msg) switch msg := msg.(type) { case *bcproto.StatusRequest: if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src.ID()); err != nil { - r.logger.Error("Could not send status message to peer", "src", src) + logger.Error("Could not send status message to src peer") } case *bcproto.BlockRequest: block := r.store.LoadBlock(msg.Height) if block != nil { if err = r.io.sendBlockToPeer(block, src.ID()); err != nil { - r.logger.Error("Could not send block message to peer: ", err) + logger.Error("Could not send block message to src peer", "err", err) } } else { - r.logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height) + logger.Info("peer asking for a block we don't have", "height", msg.Height) peerID := src.ID() if err = r.io.sendBlockNotFound(msg.Height, peerID); err != nil { - r.logger.Error("Couldn't send block not found: ", err) + logger.Error("Couldn't send block not found msg", "err", err) } } @@ -505,7 +501,7 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { r.mtx.RLock() bi, err := types.BlockFromProto(msg.Block) if err != nil { - r.logger.Error("error transitioning block from protobuf", "err", err) + logger.Error("error transitioning block from protobuf", "err", err) _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) return }