From 0c7e871ef071f7bdb8b125327707ecc2654aeaf6 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 26 Feb 2018 17:35:01 +0400 Subject: [PATCH] [blockchain] replace timeoutsCh with more abstract errorsCh --- blockchain/pool.go | 27 +++++++++++++++------------ blockchain/pool_test.go | 25 +++++++++++++------------ blockchain/reactor.go | 31 +++++++++++++++++++------------ 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 688be0e72..5de719792 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -1,6 +1,7 @@ package blockchain import ( + "errors" "fmt" "math" "sync" @@ -41,7 +42,7 @@ const ( minRecvRate = 7680 ) -var peerTimeoutSeconds = time.Duration(15) // not const so we can override with tests +var peerTimeout = 15 * time.Second // not const so we can override with tests /* Peers self report their heights when we join the block pool. @@ -68,10 +69,10 @@ type BlockPool struct { maxPeerHeight int64 requestsCh chan<- BlockRequest - timeoutsCh chan<- p2p.ID + errorsCh chan<- peerError } -func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<- p2p.ID) *BlockPool { +func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool { bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), @@ -80,7 +81,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<- numPending: 0, requestsCh: requestsCh, - timeoutsCh: timeoutsCh, + errorsCh: errorsCh, } bp.BaseService = *cmn.NewBaseService(nil, "BlockPool", bp) return bp @@ -128,9 +129,10 @@ func (pool *BlockPool) removeTimedoutPeers() { curRate := peer.recvMonitor.Status().CurRate // curRate can be 0 on start if curRate != 0 && curRate < minRecvRate { - pool.sendTimeout(peer.id) + err := errors.New("peer is not sending us data fast enough") + pool.sendError(err, peer.id) pool.Logger.Error("SendTimeout", "peer", peer.id, - "reason", "peer is not sending us data fast enough", + "reason", err, "curRate", fmt.Sprintf("%d KB/s", curRate/1024), "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) peer.didTimeout = true @@ -340,11 +342,11 @@ func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) { pool.requestsCh <- BlockRequest{height, peerID} } -func (pool *BlockPool) sendTimeout(peerID p2p.ID) { +func (pool *BlockPool) sendError(err error, peerID p2p.ID) { if !pool.IsRunning() { return } - pool.timeoutsCh <- peerID + pool.errorsCh <- peerError{err, peerID} } // unused by tendermint; left for debugging purposes @@ -403,9 +405,9 @@ func (peer *bpPeer) resetMonitor() { func (peer *bpPeer) resetTimeout() { if peer.timeout == nil { - peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout) + peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout) } else { - peer.timeout.Reset(time.Second * peerTimeoutSeconds) + peer.timeout.Reset(peerTimeout) } } @@ -431,8 +433,9 @@ func (peer *bpPeer) onTimeout() { peer.pool.mtx.Lock() defer peer.pool.mtx.Unlock() - peer.pool.sendTimeout(peer.id) - peer.logger.Error("SendTimeout", "reason", "onTimeout") + err := errors.New("peer did not send us anything") + peer.pool.sendError(err, peer.id) + peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout) peer.didTimeout = true } diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index ce16899a7..790216ac0 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -13,7 +13,7 @@ import ( ) func init() { - peerTimeoutSeconds = time.Duration(2) + peerTimeout = 2 * time.Second } type testPeer struct { @@ -34,9 +34,9 @@ func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer { func TestBasic(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) - timeoutsCh := make(chan p2p.ID, 100) - requestsCh := make(chan BlockRequest, 100) - pool := NewBlockPool(start, requestsCh, timeoutsCh) + errorsCh := make(chan peerError) + requestsCh := make(chan BlockRequest) + pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) err := pool.Start() @@ -71,8 +71,8 @@ func TestBasic(t *testing.T) { // Pull from channels for { select { - case peerID := <-timeoutsCh: - t.Errorf("timeout: %v", peerID) + case err := <-errorsCh: + t.Error(err) case request := <-requestsCh: t.Logf("Pulled new BlockRequest %v", request) if request.Height == 300 { @@ -91,9 +91,9 @@ func TestBasic(t *testing.T) { func TestTimeout(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) - timeoutsCh := make(chan p2p.ID, 100) - requestsCh := make(chan BlockRequest, 100) - pool := NewBlockPool(start, requestsCh, timeoutsCh) + errorsCh := make(chan peerError) + requestsCh := make(chan BlockRequest) + pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) err := pool.Start() if err != nil { @@ -132,9 +132,10 @@ func TestTimeout(t *testing.T) { timedOut := map[p2p.ID]struct{}{} for { select { - case peerID := <-timeoutsCh: - t.Logf("Peer %v timeouted", peerID) - if _, ok := timedOut[peerID]; !ok { + case err := <-errorsCh: + t.Log(err) + // consider error to be always timeout here + if _, ok := timedOut[err.peerID]; !ok { counter++ if counter == len(peers) { return // Done! diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 2ad6770be..cae55e807 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -22,8 +22,7 @@ const ( // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) BlockchainChannel = byte(0x40) - defaultChannelCapacity = 1000 - trySyncIntervalMS = 50 + trySyncIntervalMS = 50 // stop syncing when last block's time is // within this much of the system time. // stopSyncingDurationMinutes = 10 @@ -40,6 +39,15 @@ type consensusReactor interface { SwitchToConsensus(sm.State, int) } +type peerError struct { + err error + peerID p2p.ID +} + +func (e peerError) Error() string { + return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) +} + // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { p2p.BaseReactor @@ -56,7 +64,7 @@ type BlockchainReactor struct { fastSync bool requestsCh <-chan BlockRequest - timeoutsCh <-chan p2p.ID + errorsCh <-chan peerError } // NewBlockchainReactor returns new reactor instance. @@ -68,12 +76,12 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl store.Height())) } - requestsCh := make(chan BlockRequest, defaultChannelCapacity) - timeoutsCh := make(chan p2p.ID, defaultChannelCapacity) + requestsCh := make(chan BlockRequest) + errorsCh := make(chan peerError) pool := NewBlockPool( store.Height()+1, requestsCh, - timeoutsCh, + errorsCh, ) bcR := &BlockchainReactor{ params: state.ConsensusParams, @@ -83,7 +91,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl pool: pool, fastSync: fastSync, requestsCh: requestsCh, - timeoutsCh: timeoutsCh, + errorsCh: errorsCh, } bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) return bcR @@ -230,7 +238,7 @@ func (bcR *BlockchainReactor) poolRoutine() { FOR_LOOP: for { select { - case request := <-bcR.requestsCh: // chan BlockRequest + case request := <-bcR.requestsCh: peer := bcR.Switch.Peers().Get(request.PeerID) if peer == nil { continue FOR_LOOP // Peer has since been disconnected. @@ -242,11 +250,10 @@ FOR_LOOP: // The pool handles timeouts, just let it go. continue FOR_LOOP } - case peerID := <-bcR.timeoutsCh: // chan string - // Peer timed out. - peer := bcR.Switch.Peers().Get(peerID) + case err := <-bcR.errorsCh: + peer := bcR.Switch.Peers().Get(err.peerID) if peer != nil { - bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) + bcR.Switch.StopPeerForError(peer, err) } case <-statusUpdateTicker.C: // ask for status updates