mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-06 03:50:46 +00:00
blockchain: working on concurrency issues
This commit is contained in:
@@ -133,7 +133,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
|
||||
// curRate can be 0 on start
|
||||
if curRate != 0 && curRate < minRecvRate {
|
||||
err := errors.New("peer is not sending us data fast enough")
|
||||
pool.sendError(err, peer.id)
|
||||
go pool.sendError(err, peer.id)
|
||||
pool.Logger.Error("SendTimeout", "peer", peer.id,
|
||||
"reason", err,
|
||||
"curRate", fmt.Sprintf("%d KB/s", curRate/1024),
|
||||
@@ -232,14 +232,16 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
|
||||
defer pool.mtx.Unlock()
|
||||
|
||||
requester := pool.requesters[block.Height]
|
||||
poolHeight := pool.height
|
||||
|
||||
if requester == nil {
|
||||
pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height)
|
||||
diff := pool.height - block.Height
|
||||
pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", poolHeight, "blockHeight", block.Height)
|
||||
diff := poolHeight - block.Height
|
||||
if diff < 0 {
|
||||
diff *= -1
|
||||
}
|
||||
if diff > maxDiffBetweenCurrentAndReceivedBlockHeight {
|
||||
pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
|
||||
go pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -247,6 +249,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
|
||||
if requester.setBlock(block, peerID) {
|
||||
pool.numPending--
|
||||
peer := pool.peers[peerID]
|
||||
|
||||
if peer != nil {
|
||||
peer.decrPending(blockSize)
|
||||
}
|
||||
@@ -292,7 +295,8 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) {
|
||||
for _, requester := range pool.requesters {
|
||||
if requester.getPeerID() == peerID {
|
||||
if requester.getBlock() != nil {
|
||||
pool.numPending++
|
||||
continue
|
||||
// pool.numPending++
|
||||
}
|
||||
go requester.redo() // pick another peer and ...
|
||||
}
|
||||
@@ -443,7 +447,7 @@ func (peer *bpPeer) onTimeout() {
|
||||
defer peer.pool.mtx.Unlock()
|
||||
|
||||
err := errors.New("peer did not send us anything")
|
||||
peer.pool.sendError(err, peer.id)
|
||||
go peer.pool.sendError(err, peer.id)
|
||||
peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout)
|
||||
peer.didTimeout = true
|
||||
}
|
||||
@@ -517,7 +521,14 @@ func (bpr *bpRequester) reset() {
|
||||
// Tells bpRequester to pick another peer and try again.
|
||||
// NOTE: blocking
|
||||
func (bpr *bpRequester) redo() {
|
||||
bpr.redoCh <- struct{}{}
|
||||
select {
|
||||
case bpr.redoCh <- struct{}{}:
|
||||
case <-bpr.Quit():
|
||||
return
|
||||
case <-bpr.pool.Quit():
|
||||
bpr.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Responsible for making more requests as necessary
|
||||
@@ -556,17 +567,8 @@ OUTER_LOOP:
|
||||
bpr.reset()
|
||||
continue OUTER_LOOP // When peer is removed
|
||||
case <-bpr.gotBlockCh:
|
||||
// We got the block, now see if it's good.
|
||||
select {
|
||||
case <-bpr.pool.Quit():
|
||||
bpr.Stop()
|
||||
return
|
||||
case <-bpr.Quit():
|
||||
return
|
||||
case <-bpr.redoCh:
|
||||
bpr.reset()
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
bpr.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user