From 14161ea39c193fe55a5dad04763fe43d22da9bda Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 22 Mar 2015 16:23:24 -0700 Subject: [PATCH] Blockpool tests --- block/pool.go | 90 ++++++++++++++++++++++++++++------------------ block/pool_test.go | 49 +++++++++++++++++++++---- 2 files changed, 99 insertions(+), 40 deletions(-) diff --git a/block/pool.go b/block/pool.go index a50bc7ed1..48d86c24c 100644 --- a/block/pool.go +++ b/block/pool.go @@ -130,8 +130,7 @@ func (bp *BlockPool) handleEvent(event_ interface{}) { } } } - case bpPeerStatus: - // we have updated (or new) status from peer, + case bpPeerStatus: // updated or new status from peer // request blocks if possible. peer := bp.peers[event.peerId] if peer == nil { @@ -139,30 +138,45 @@ func (bp *BlockPool) handleEvent(event_ interface{}) { bp.peers[peer.id] = peer } bp.requestBlocksFromPeer(peer) - case bpRequestTimeout: + case bpRequestTimeout: // unconditional timeout for each peer's request. peer := bp.peers[event.peerId] - request := peer.requests[event.height] + if peer == nil { + // cleanup was already handled. + return + } + height := event.height + request := peer.requests[height] if request == nil || request.block != nil { - // a request for event.height might have timed out for peer. - // but not necessarily, the timeout is unconditional. - } else { - peer.bad++ - if request.tries < maxTries { - // try again, start timer again. - request.start(bp.eventsCh) - msg := BlockRequest{event.height, peer.id} - go func() { bp.requestsCh <- msg }() - } else { - // delete the request. - if peer != nil { - delete(peer.requests, event.height) - } - blockInfo := bp.blockInfos[event.height] - if blockInfo != nil { - delete(blockInfo.requests, peer.id) - } - go func() { bp.timeoutsCh <- peer.id }() + // the request was fulfilled by some peer or this peer. + return + } + + // A request for peer timed out. + peer.bad++ + if request.tries < maxTries { + log.Warn("Timeout: Trying again.", "tries", request.tries, "peerId", peer.id) + // try again. + select { + case bp.requestsCh <- BlockRequest{height, peer.id}: + request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries + default: + // The request cannot be made because requestCh is full. + // Just delete the request. + delete(peer.requests, height) } + } else { + log.Warn("Timeout: Deleting request") + // delete the request. + delete(peer.requests, height) + blockInfo := bp.blockInfos[height] + if blockInfo != nil { + delete(blockInfo.requests, peer.id) + } + select { + case bp.timeoutsCh <- peer.id: + default: + } + } } } @@ -186,16 +200,21 @@ func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) { needsMorePeers := blockInfo.needsMorePeers() alreadyAskedPeer := blockInfo.requests[peer.id] != nil if needsMorePeers && !alreadyAskedPeer { - // Create a new request and start the timer. - request := &bpBlockRequest{ - height: height, - peer: peer, + select { + case bp.requestsCh <- BlockRequest{height, peer.id}: + // Create a new request and start the timer. + request := &bpBlockRequest{ + height: height, + peer: peer, + } + blockInfo.requests[peer.id] = request + peer.requests[height] = request + request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries + default: + // The request cannot be made because requestCh is full. + // Just stop. + return } - blockInfo.requests[peer.id] = request - peer.requests[height] = request - request.start(bp.eventsCh) - msg := BlockRequest{height, peer.id} - go func() { bp.requestsCh <- msg }() } } } @@ -203,6 +222,8 @@ func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) { func (bp *BlockPool) makeMoreBlockInfos() { // make more requests if necessary. for i := 0; i < requestBatchSize; i++ { + //log.Debug("Confused?", + // "numPending", bp.numPending, "maxPendingRequests", maxPendingRequests, "numtotal", bp.numTotal, "maxTotalRequests", maxTotalRequests) if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests { // Make a request for the next block height requestHeight := bp.height + uint(bp.numTotal) @@ -232,6 +253,7 @@ func (bp *BlockPool) pickAvailablePeers(choose int) []*bpPeer { return chosen } +// blocking func (bp *BlockPool) pushBlocksFromStart() { for height := bp.height; ; height++ { // push block to blocksCh. @@ -242,7 +264,7 @@ func (bp *BlockPool) pushBlocksFromStart() { bp.numTotal-- bp.height++ delete(bp.blockInfos, height) - go func() { bp.blocksCh <- blockInfo.block }() + bp.blocksCh <- blockInfo.block } } @@ -277,7 +299,7 @@ type bpBlockRequest struct { // bump tries++ and set timeout. // NOTE: the timer is unconditional. -func (request bpBlockRequest) start(eventsCh chan<- interface{}) { +func (request *bpBlockRequest) startAndTimeoutTo(eventsCh chan<- interface{}) { request.tries++ time.AfterFunc(requestTimeoutSeconds*time.Second, func() { eventsCh <- bpRequestTimeout{ diff --git a/block/pool_test.go b/block/pool_test.go index 2b8aa0868..e5d2d00ef 100644 --- a/block/pool_test.go +++ b/block/pool_test.go @@ -28,9 +28,9 @@ func TestBasic(t *testing.T) { start := uint(42) maxHeight := uint(300) - timeoutsCh := make(chan string) - requestsCh := make(chan BlockRequest) - blocksCh := make(chan *Block) + timeoutsCh := make(chan string, 100) + requestsCh := make(chan BlockRequest, 100) + blocksCh := make(chan *Block, 100) pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) pool.Start() @@ -50,15 +50,15 @@ func TestBasic(t *testing.T) { case peerId := <-timeoutsCh: t.Errorf("timeout: %v", peerId) case request := <-requestsCh: - log.Debug("Pulled new BlockRequest", "request", request) + log.Debug("TEST: Pulled new BlockRequest", "request", request) // After a while, pretend like we got a block from the peer. go func() { block := &Block{Header: &Header{Height: request.Height}} pool.AddBlock(block, request.PeerId) - log.Debug("Added block", "block", request.Height, "peer", request.PeerId) + log.Debug("TEST: Added block", "block", request.Height, "peer", request.PeerId) }() case block := <-blocksCh: - log.Debug("Pulled new Block", "height", block.Height) + log.Debug("TEST: Pulled new Block", "height", block.Height) if block.Height != lastSeenBlock+1 { t.Fatalf("Wrong order of blocks seen. Expected: %v Got: %v", lastSeenBlock+1, block.Height) } @@ -69,6 +69,43 @@ func TestBasic(t *testing.T) { } } + pool.Stop() +} + +func TestTimeout(t *testing.T) { + peers := makePeers(100, 0, 1000) + start := uint(42) + timeoutsCh := make(chan string, 10) + requestsCh := make(chan BlockRequest, 10) + blocksCh := make(chan *Block, 100) + + pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) + pool.Start() + + // Introduce each peer. + go func() { + for _, peer := range peers { + pool.SetPeerStatus(peer.id, peer.height) + } + }() + + // Pull from channels + for { + select { + case peerId := <-timeoutsCh: + // Timed out. Done! + if peers[peerId].id != peerId { + t.Errorf("Unexpected peer from timeoutsCh") + } + //return + case _ = <-requestsCh: + // Don't do anything, let it time out. + case _ = <-blocksCh: + t.Errorf("Got block when none expected") + return + } + } + pool.Stop() }