mirror of
https://github.com/tendermint/tendermint.git
synced 2026-05-22 15:11:29 +00:00
blockchain/v1: handle peers without blocks (#5701)
Closes #5444 Now we record the fact that a peer does not have a requested block and later use this information to make a new request for the same block from another peer.
This commit is contained in:
@@ -25,3 +25,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes)
|
||||
|
||||
@@ -31,6 +31,7 @@ type BpPeer struct {
|
||||
Height int64 // the peer reported height
|
||||
NumPendingBlockRequests int // number of requests still waiting for block responses
|
||||
blocks map[int64]*types.Block // blocks received or expected to be received from this peer
|
||||
noBlocks map[int64]struct{} // heights for which the peer does not have blocks
|
||||
blockResponseTimer *time.Timer
|
||||
recvMonitor *flow.Monitor
|
||||
params *BpPeerParams // parameters for timer and monitor
|
||||
@@ -46,13 +47,14 @@ func NewBpPeer(peerID p2p.ID, base int64, height int64,
|
||||
params = BpPeerDefaultParams()
|
||||
}
|
||||
return &BpPeer{
|
||||
ID: peerID,
|
||||
Base: base,
|
||||
Height: height,
|
||||
blocks: make(map[int64]*types.Block, maxRequestsPerPeer),
|
||||
logger: log.NewNopLogger(),
|
||||
onErr: onErr,
|
||||
params: params,
|
||||
ID: peerID,
|
||||
Base: base,
|
||||
Height: height,
|
||||
blocks: make(map[int64]*types.Block, maxRequestsPerPeer),
|
||||
noBlocks: make(map[int64]struct{}),
|
||||
logger: log.NewNopLogger(),
|
||||
onErr: onErr,
|
||||
params: params,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,6 +133,19 @@ func (peer *BpPeer) RemoveBlock(height int64) {
|
||||
delete(peer.blocks, height)
|
||||
}
|
||||
|
||||
// SetNoBlock records that the peer does not have a block for height.
|
||||
func (peer *BpPeer) SetNoBlock(height int64) {
|
||||
peer.noBlocks[height] = struct{}{}
|
||||
}
|
||||
|
||||
// NoBlock returns true if the peer does not have a block for height.
|
||||
func (peer *BpPeer) NoBlock(height int64) bool {
|
||||
if _, ok := peer.noBlocks[height]; ok {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// RequestSent records that a request was sent, and starts the peer timer and monitor if needed.
|
||||
func (peer *BpPeer) RequestSent(height int64) {
|
||||
peer.blocks[height] = nil
|
||||
|
||||
@@ -100,6 +100,18 @@ func (pool *BlockPool) UpdatePeer(peerID p2p.ID, base int64, height int64) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetNoBlock records that the peer does not have a block for height and
|
||||
// schedules a new request for that height from another peer.
|
||||
func (pool *BlockPool) SetNoBlock(peerID p2p.ID, height int64) {
|
||||
peer := pool.peers[peerID]
|
||||
if peer == nil {
|
||||
return
|
||||
}
|
||||
peer.SetNoBlock(height)
|
||||
|
||||
pool.rescheduleRequest(peerID, height)
|
||||
}
|
||||
|
||||
// Cleans and deletes the peer. Recomputes the max peer height.
|
||||
func (pool *BlockPool) deletePeer(peer *BpPeer) {
|
||||
if peer == nil {
|
||||
@@ -214,7 +226,7 @@ func (pool *BlockPool) sendRequest(height int64) bool {
|
||||
if peer.NumPendingBlockRequests >= maxRequestsPerPeer {
|
||||
continue
|
||||
}
|
||||
if peer.Base > height || peer.Height < height {
|
||||
if peer.Base > height || peer.Height < height || peer.NoBlock(height) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -277,6 +277,7 @@ func init() {
|
||||
return waitForBlock, err
|
||||
case noBlockResponseEv:
|
||||
fsm.logger.Error("peer does not have requested block", "peer", data.peerID)
|
||||
fsm.pool.SetNoBlock(data.peerID, data.height)
|
||||
|
||||
return waitForBlock, nil
|
||||
case processedBlockEv:
|
||||
|
||||
@@ -102,6 +102,19 @@ func sProcessedBlockEv(current, expected string, reactorError error) fsmStepTest
|
||||
}
|
||||
}
|
||||
|
||||
func sNoBlockResponseEv(current, expected string, peerID p2p.ID, height int64, err error) fsmStepTestValues {
|
||||
return fsmStepTestValues{
|
||||
currentState: current,
|
||||
event: noBlockResponseEv,
|
||||
data: bReactorEventData{
|
||||
peerID: peerID,
|
||||
height: height,
|
||||
},
|
||||
wantState: expected,
|
||||
wantErr: err,
|
||||
}
|
||||
}
|
||||
|
||||
func sStatusEv(current, expected string, peerID p2p.ID, height int64, err error) fsmStepTestValues {
|
||||
return fsmStepTestValues{
|
||||
currentState: current,
|
||||
@@ -354,6 +367,46 @@ func TestFSMBlockVerificationFailure(t *testing.T) {
|
||||
executeFSMTests(t, tests, false)
|
||||
}
|
||||
|
||||
func TestFSMNoBlockResponse(t *testing.T) {
|
||||
tests := []testFields{
|
||||
{
|
||||
name: "no block response",
|
||||
startingHeight: 1,
|
||||
maxRequestsPerPeer: 3,
|
||||
steps: []fsmStepTestValues{
|
||||
sStartFSMEv(),
|
||||
|
||||
// add P1 and get blocks 1-3 from it
|
||||
sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
|
||||
sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
|
||||
sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}),
|
||||
sBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}),
|
||||
sBlockRespEv("waitForBlock", "waitForBlock", "P1", 3, []int64{1, 2}),
|
||||
|
||||
// add P2
|
||||
sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil),
|
||||
|
||||
// process block failure, should remove P1 and all blocks
|
||||
sNoBlockResponseEv("waitForBlock", "waitForBlock", "P1", 1, nil),
|
||||
sNoBlockResponseEv("waitForBlock", "waitForBlock", "P1", 2, nil),
|
||||
sNoBlockResponseEv("waitForBlock", "waitForBlock", "P1", 3, nil),
|
||||
|
||||
// get blocks 1-3 from P2
|
||||
sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
|
||||
sBlockRespEv("waitForBlock", "waitForBlock", "P2", 1, []int64{}),
|
||||
sBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{1}),
|
||||
sBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}),
|
||||
|
||||
// finish after processing blocks 1 and 2
|
||||
sProcessedBlockEv("waitForBlock", "waitForBlock", nil),
|
||||
sProcessedBlockEv("waitForBlock", "finished", nil),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
executeFSMTests(t, tests, false)
|
||||
}
|
||||
|
||||
func TestFSMBadBlockFromPeer(t *testing.T) {
|
||||
tests := []testFields{
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user