diff --git a/blockchain/pool.go b/blockchain/pool.go index 603b4bf2a..bd11f0943 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -133,7 +133,7 @@ func (pool *BlockPool) removeTimedoutPeers() { // curRate can be 0 on start if curRate != 0 && curRate < minRecvRate { err := errors.New("peer is not sending us data fast enough") - pool.sendError(err, peer.id) + go pool.sendError(err, peer.id) pool.Logger.Error("SendTimeout", "peer", peer.id, "reason", err, "curRate", fmt.Sprintf("%d KB/s", curRate/1024), @@ -232,14 +232,16 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int defer pool.mtx.Unlock() requester := pool.requesters[block.Height] + poolHeight := pool.height + if requester == nil { - pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height) - diff := pool.height - block.Height + pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", poolHeight, "blockHeight", block.Height) + diff := poolHeight - block.Height if diff < 0 { diff *= -1 } if diff > maxDiffBetweenCurrentAndReceivedBlockHeight { - pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + go pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) } return } @@ -247,6 +249,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int if requester.setBlock(block, peerID) { pool.numPending-- peer := pool.peers[peerID] + if peer != nil { peer.decrPending(blockSize) } @@ -292,7 +295,8 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) { for _, requester := range pool.requesters { if requester.getPeerID() == peerID { if requester.getBlock() != nil { - pool.numPending++ + continue + // pool.numPending++ } go requester.redo() // pick another peer and ... } @@ -443,7 +447,7 @@ func (peer *bpPeer) onTimeout() { defer peer.pool.mtx.Unlock() err := errors.New("peer did not send us anything") - peer.pool.sendError(err, peer.id) + go peer.pool.sendError(err, peer.id) peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout) peer.didTimeout = true } @@ -517,7 +521,14 @@ func (bpr *bpRequester) reset() { // Tells bpRequester to pick another peer and try again. // NOTE: blocking func (bpr *bpRequester) redo() { - bpr.redoCh <- struct{}{} + select { + case bpr.redoCh <- struct{}{}: + case <-bpr.Quit(): + return + case <-bpr.pool.Quit(): + bpr.Stop() + return + } } // Responsible for making more requests as necessary @@ -556,17 +567,8 @@ OUTER_LOOP: bpr.reset() continue OUTER_LOOP // When peer is removed case <-bpr.gotBlockCh: - // We got the block, now see if it's good. - select { - case <-bpr.pool.Quit(): - bpr.Stop() - return - case <-bpr.Quit(): - return - case <-bpr.redoCh: - bpr.reset() - continue OUTER_LOOP - } + bpr.Stop() + return } } }