From a4606f1c5ef3f9b97f14f35ee83dbb7a064d5e9c Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 22 Mar 2015 19:20:54 -0700 Subject: [PATCH 01/15] Make pool tests faster --- blockchain/pool.go | 5 ++++- blockchain/pool_test.go | 5 +++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 528c66548..3de2c2969 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -12,7 +12,6 @@ import ( const ( maxOutstandingRequestsPerPeer = 10 eventsChannelCapacity = 100 - requestTimeoutSeconds = 10 maxTries = 3 requestIntervalMS = 500 requestBatchSize = 50 @@ -21,6 +20,10 @@ const ( maxPeersPerRequest = 1 ) +var ( + requestTimeoutSeconds = time.Duration(10) +) + type BlockRequest struct { Height uint PeerId string diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 4376f3ac0..a2578aa13 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -3,6 +3,7 @@ package blockchain import ( "math/rand" "testing" + "time" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/types" @@ -74,6 +75,10 @@ func TestBasic(t *testing.T) { } func TestTimeout(t *testing.T) { + origRequestTimeoutSeconds := requestTimeoutSeconds + requestTimeoutSeconds = time.Duration(0) + defer func() { requestTimeoutSeconds = origRequestTimeoutSeconds }() + peers := makePeers(100, 0, 1000) start := uint(42) timeoutsCh := make(chan string, 10) From 9703d34b65682930d5d8f267a94eecde8e742545 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 24 Mar 2015 11:02:30 -0700 Subject: [PATCH 02/15] fixed pool, using locks now. --- blockchain/pool.go | 571 +++++++++++++++++++++------------------- blockchain/pool_test.go | 91 ++++--- 2 files changed, 347 insertions(+), 315 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 3de2c2969..3e3313bfb 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -1,7 +1,7 @@ package blockchain import ( - "math/rand" + "sync" "sync/atomic" "time" @@ -11,347 +11,368 @@ import ( const ( maxOutstandingRequestsPerPeer = 10 - eventsChannelCapacity = 100 + inputsChannelCapacity = 100 maxTries = 3 requestIntervalMS = 500 requestBatchSize = 50 maxPendingRequests = 50 maxTotalRequests = 100 - maxPeersPerRequest = 1 + maxRequestsPerPeer = 20 ) var ( - requestTimeoutSeconds = time.Duration(10) + requestTimeoutSeconds = time.Duration(1) ) -type BlockRequest struct { - Height uint - PeerId string -} - type BlockPool struct { - peers map[string]*bpPeer - blockInfos map[uint]*bpBlockInfo - height uint // the lowest key in blockInfos. - started int32 // atomic - stopped int32 // atomic - numPending int32 - numTotal int32 - eventsCh chan interface{} // internal events. - requestsCh chan<- BlockRequest // output of new requests to make. - timeoutsCh chan<- string // output of peers that timed out. - blocksCh chan<- *types.Block // output of ordered blocks. - repeater *RepeatTimer // for requesting more bocks. - quit chan struct{} + // block requests + requestsMtx sync.Mutex + requests map[uint]*bpRequest + height uint // the lowest key in requests. + numPending int32 + numTotal int32 + + // peers + peersMtx sync.Mutex + peers map[string]*bpPeer + + requestsCh chan<- BlockRequest + timeoutsCh chan<- string + repeater *RepeatTimer + + running int32 // atomic } -func NewBlockPool(start uint, timeoutsCh chan<- string, requestsCh chan<- BlockRequest, blocksCh chan<- *types.Block) *BlockPool { +func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool { return &BlockPool{ - peers: make(map[string]*bpPeer), - blockInfos: make(map[uint]*bpBlockInfo), + peers: make(map[string]*bpPeer), + + requests: make(map[uint]*bpRequest), height: start, - started: 0, - stopped: 0, numPending: 0, numTotal: 0, - quit: make(chan struct{}), - eventsCh: make(chan interface{}, eventsChannelCapacity), requestsCh: requestsCh, timeoutsCh: timeoutsCh, - blocksCh: blocksCh, repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond), + + running: 0, } } func (bp *BlockPool) Start() { - if atomic.CompareAndSwapInt32(&bp.started, 0, 1) { + if atomic.CompareAndSwapInt32(&bp.running, 0, 1) { log.Info("Starting BlockPool") go bp.run() } } func (bp *BlockPool) Stop() { - if atomic.CompareAndSwapInt32(&bp.stopped, 0, 1) { + if atomic.CompareAndSwapInt32(&bp.running, 1, 0) { log.Info("Stopping BlockPool") - close(bp.quit) - close(bp.eventsCh) - close(bp.requestsCh) - close(bp.timeoutsCh) - close(bp.blocksCh) bp.repeater.Stop() } } -// AddBlock should be called when a block is received. -func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { - bp.eventsCh <- bpBlockResponse{block, peerId} +func (bp *BlockPool) IsRunning() bool { + return atomic.LoadInt32(&bp.running) == 1 } -func (bp *BlockPool) SetPeerStatus(peerId string, height uint) { - bp.eventsCh <- bpPeerStatus{peerId, height} -} - -// Runs in a goroutine and processes messages. +// Run spawns requests as needed. func (bp *BlockPool) run() { -FOR_LOOP: +RUN_LOOP: for { - select { - case msg := <-bp.eventsCh: - bp.handleEvent(msg) - case <-bp.repeater.Ch: - bp.makeMoreBlockInfos() - bp.requestBlocksFromRandomPeers(10) - case <-bp.quit: - break FOR_LOOP + if atomic.LoadInt32(&bp.running) == 0 { + break RUN_LOOP } - } -} - -func (bp *BlockPool) handleEvent(event_ interface{}) { - switch event := event_.(type) { - case bpBlockResponse: - peer := bp.peers[event.peerId] - blockInfo := bp.blockInfos[event.block.Height] - if blockInfo == nil { - // block was unwanted. - if peer != nil { - peer.bad++ - } + height, numPending, numTotal := bp.GetStatus() + log.Debug("BlockPool.run", "height", height, "numPending", numPending, + "numTotal", numTotal) + if numPending >= maxPendingRequests { + // sleep for a bit. + time.Sleep(requestIntervalMS * time.Millisecond) + } else if numTotal >= maxTotalRequests { + // sleep for a bit. + time.Sleep(requestIntervalMS * time.Millisecond) } else { - // block was wanted. - if peer != nil { - peer.good++ - } - delete(peer.requests, event.block.Height) - if blockInfo.block == nil { - // peer is the first to give it to us. - blockInfo.block = event.block - blockInfo.blockBy = peer.id - bp.numPending-- - if event.block.Height == bp.height { - go bp.pushBlocksFromStart() - } - } - } - case bpPeerStatus: // updated or new status from peer - // request blocks if possible. - peer := bp.peers[event.peerId] - if peer == nil { - peer = bpNewPeer(event.peerId, event.height) - bp.peers[peer.id] = peer - } - bp.requestBlocksFromPeer(peer) - case bpRequestTimeout: // unconditional timeout for each peer's request. - peer := bp.peers[event.peerId] - if peer == nil { - // cleanup was already handled. - return - } - height := event.height - request := peer.requests[height] - if request == nil || request.block != nil { - // the request was fulfilled by some peer or this peer. - return - } - - // A request for peer timed out. - peer.bad++ - if request.tries < maxTries { - log.Warn("Timeout: Trying again.", "tries", request.tries, "peerId", peer.id) - // try again. - select { - case bp.requestsCh <- BlockRequest{height, peer.id}: - request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries - default: - // The request cannot be made because requestCh is full. - // Just delete the request. - delete(peer.requests, height) - } - } else { - log.Warn("Timeout: Deleting request") - // delete the request. - delete(peer.requests, height) - blockInfo := bp.blockInfos[height] - if blockInfo != nil { - delete(blockInfo.requests, peer.id) - } - select { - case bp.timeoutsCh <- peer.id: - default: - } - + // request for more blocks. + height := bp.nextHeight() + bp.makeRequest(height) } } } -// NOTE: This function is sufficient, but we should find pending blocks -// and sample the peers in one go rather than the current O(n^2) impl. -func (bp *BlockPool) requestBlocksFromRandomPeers(maxPeers int) { - chosen := bp.pickAvailablePeers(maxPeers) - log.Debug("requestBlocksFromRandomPeers", "chosen", len(chosen)) - for _, peer := range chosen { - bp.requestBlocksFromPeer(peer) +func (bp *BlockPool) GetStatus() (uint, int32, int32) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + return bp.height, bp.numPending, bp.numTotal +} + +// We need to see the second block's Validation to validate the first block. +// So we peek two blocks at a time. +func (bp *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + if r := bp.requests[bp.height]; r != nil { + first = r.block + } + if r := bp.requests[bp.height+1]; r != nil { + second = r.block + } + return +} + +// Pop the first block at bp.height +// It must have been validated by 'second'.Validation from PeekTwoBlocks(). +func (bp *BlockPool) PopRequest() { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + if r := bp.requests[bp.height]; r == nil || r.block == nil { + panic("PopRequest() requires a valid block") + } + + delete(bp.requests, bp.height) + bp.height++ + bp.numTotal-- +} + +// Invalidates the block at bp.height. +// Remove the peer and request from others. +func (bp *BlockPool) RedoRequest(height uint) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + request := bp.requests[height] + if request.block == nil { + panic("Expected block to be non-nil") + } + bp.removePeer(request.peerId) + request.block = nil + request.peerId = "" + bp.numPending++ + + go requestRoutine(bp, height) +} + +func (bp *BlockPool) hasBlock(height uint) bool { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + request := bp.requests[height] + return request != nil && request.block != nil +} + +func (bp *BlockPool) setPeerForRequest(height uint, peerId string) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + request := bp.requests[height] + if request == nil { + return + } + request.peerId = peerId +} + +func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + request := bp.requests[block.Height] + if request == nil { + return + } + if request.peerId != peerId { + return + } + if request.block != nil { + return + } + request.block = block + bp.numPending-- +} + +func (bp *BlockPool) getPeer(peerId string) *bpPeer { + bp.peersMtx.Lock() // Lock + defer bp.peersMtx.Unlock() + + peer := bp.peers[peerId] + return peer +} + +// Sets the peer's blockchain height. +func (bp *BlockPool) SetPeerHeight(peerId string, height uint) { + bp.peersMtx.Lock() // Lock + defer bp.peersMtx.Unlock() + + peer := bp.peers[peerId] + if peer != nil { + peer.height = height + } else { + peer = &bpPeer{ + height: height, + id: peerId, + numRequests: 0, + } + bp.peers[peerId] = peer } } -func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) { - // If peer is available and can provide something... - for height := bp.height; peer.available(); height++ { - blockInfo := bp.blockInfos[height] - if blockInfo == nil { - // We're out of range. - return - } - needsMorePeers := blockInfo.needsMorePeers() - alreadyAskedPeer := blockInfo.requests[peer.id] != nil - if needsMorePeers && !alreadyAskedPeer { - select { - case bp.requestsCh <- BlockRequest{height, peer.id}: - // Create a new request and start the timer. - request := &bpBlockRequest{ - height: height, - peer: peer, - } - blockInfo.requests[peer.id] = request - peer.requests[height] = request - request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries - default: - // The request cannot be made because requestCh is full. - // Just stop. - return - } - } - } +func (bp *BlockPool) RemovePeer(peerId string) { + bp.peersMtx.Lock() // Lock + defer bp.peersMtx.Unlock() + + delete(bp.peers, peerId) } -func (bp *BlockPool) makeMoreBlockInfos() { - // make more requests if necessary. - for i := 0; i < requestBatchSize; i++ { - //log.Debug("Confused?", - // "numPending", bp.numPending, "maxPendingRequests", maxPendingRequests, "numtotal", bp.numTotal, "maxTotalRequests", maxTotalRequests) - if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests { - // Make a request for the next block height - requestHeight := bp.height + uint(bp.numTotal) - log.Debug("New blockInfo", "height", requestHeight) - blockInfo := bpNewBlockInfo(requestHeight) - bp.blockInfos[requestHeight] = blockInfo - bp.numPending++ - bp.numTotal++ - } else { - break - } - } -} +// Pick an available peer with at least the given minHeight. +// If no peers are available, returns nil. +func (bp *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { + bp.peersMtx.Lock() + defer bp.peersMtx.Unlock() -func (bp *BlockPool) pickAvailablePeers(choose int) []*bpPeer { - available := []*bpPeer{} for _, peer := range bp.peers { - if peer.available() { - available = append(available, peer) + if peer.numRequests >= maxRequestsPerPeer { + continue + } + if peer.height < minHeight { + continue + } + peer.numRequests++ + return peer + } + + return nil +} + +func (bp *BlockPool) decrPeer(peerId string) { + bp.peersMtx.Lock() + defer bp.peersMtx.Unlock() + + peer := bp.peers[peerId] + if peer == nil { + return + } + peer.numRequests-- +} + +func (bp *BlockPool) nextHeight() uint { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + return bp.height + uint(bp.numTotal) +} + +func (bp *BlockPool) makeRequest(height uint) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + request := &bpRequest{ + height: height, + peerId: "", + block: nil, + } + bp.requests[height] = request + + nextHeight := bp.height + uint(bp.numTotal) + if nextHeight == height { + bp.numTotal++ + bp.numPending++ + } + + go requestRoutine(bp, height) +} + +func (bp *BlockPool) sendRequest(height uint, peerId string) { + if atomic.LoadInt32(&bp.running) == 0 { + return + } + bp.requestsCh <- BlockRequest{height, peerId} +} + +func (bp *BlockPool) sendTimeout(peerId string) { + if atomic.LoadInt32(&bp.running) == 0 { + return + } + bp.timeoutsCh <- peerId +} + +func (bp *BlockPool) debug() string { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + str := "" + for h := bp.height; h < bp.height+uint(bp.numTotal); h++ { + if bp.requests[h] == nil { + str += Fmt("H(%v):X ", h) + } else { + str += Fmt("H(%v):", h) + str += Fmt("B?(%v) ", bp.requests[h].block != nil) } } - perm := rand.Perm(MinInt(choose, len(available))) - chosen := make([]*bpPeer, len(perm)) - for i, idx := range perm { - chosen[i] = available[idx] - } - return chosen -} - -// blocking -func (bp *BlockPool) pushBlocksFromStart() { - for height := bp.height; ; height++ { - // push block to blocksCh. - blockInfo := bp.blockInfos[height] - if blockInfo == nil || blockInfo.block == nil { - break - } - bp.numTotal-- - bp.height++ - delete(bp.blockInfos, height) - bp.blocksCh <- blockInfo.block - } -} - -//----------------------------------------------------------------------------- - -type bpBlockInfo struct { - height uint - requests map[string]*bpBlockRequest - block *types.Block // first block received - blockBy string // peerId of source -} - -func bpNewBlockInfo(height uint) *bpBlockInfo { - return &bpBlockInfo{ - height: height, - requests: make(map[string]*bpBlockRequest), - } -} - -func (blockInfo *bpBlockInfo) needsMorePeers() bool { - return len(blockInfo.requests) < maxPeersPerRequest -} - -//------------------------------------- - -type bpBlockRequest struct { - peer *bpPeer - height uint - block *types.Block - tries int -} - -// bump tries++ and set timeout. -// NOTE: the timer is unconditional. -func (request *bpBlockRequest) startAndTimeoutTo(eventsCh chan<- interface{}) { - request.tries++ - time.AfterFunc(requestTimeoutSeconds*time.Second, func() { - eventsCh <- bpRequestTimeout{ - peerId: request.peer.id, - height: request.height, - } - }) + return str } //------------------------------------- type bpPeer struct { - id string - height uint - requests map[uint]*bpBlockRequest - // Count good/bad events from peer. - good uint - bad uint + id string + height uint + numRequests int32 } -func bpNewPeer(peerId string, height uint) *bpPeer { - return &bpPeer{ - id: peerId, - height: height, - requests: make(map[uint]*bpBlockRequest), - } -} - -func (peer *bpPeer) available() bool { - return len(peer.requests) < maxOutstandingRequestsPerPeer +type bpRequest struct { + height uint + peerId string + block *types.Block } //------------------------------------- -// bp.eventsCh messages -type bpBlockResponse struct { - block *types.Block - peerId string +// Responsible for making more requests as necessary +// Returns when a block is found (e.g. AddBlock() is called) +func requestRoutine(bp *BlockPool, height uint) { + for { + var peer *bpPeer = nil + PICK_LOOP: + for { + if !bp.IsRunning() { + return + } + peer = bp.pickIncrAvailablePeer(height) + if peer == nil { + time.Sleep(requestIntervalMS * time.Millisecond) + continue PICK_LOOP + } + break PICK_LOOP + } + + bp.setPeerForRequest(height, peer.id) + + for try := 0; try < maxTries; try++ { + bp.sendRequest(height, peer.id) + time.Sleep(requestTimeoutSeconds * time.Second) + if bp.hasBlock(height) { + bp.decrPeer(peer.id) + return + } + bpHeight, _, _ := bp.GetStatus() + if height < bpHeight { + bp.decrPeer(peer.id) + return + } + } + + bp.RemovePeer(peer.id) + bp.sendTimeout(peer.id) + } } -type bpPeerStatus struct { - peerId string - height uint // blockchain tip of peer -} +//------------------------------------- -type bpRequestTimeout struct { - peerId string - height uint +type BlockRequest struct { + Height uint + PeerId string } diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index a2578aa13..c07a11d85 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -25,26 +25,34 @@ func makePeers(numPeers int, minHeight, maxHeight uint) map[string]testPeer { } func TestBasic(t *testing.T) { - // 100 peers anywhere at height 0 to 1000. - peers := makePeers(100, 0, 1000) - + peers := makePeers(10, 0, 1000) start := uint(42) - maxHeight := uint(300) timeoutsCh := make(chan string, 100) requestsCh := make(chan BlockRequest, 100) - blocksCh := make(chan *types.Block, 100) - - pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) + pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() // Introduce each peer. go func() { for _, peer := range peers { - pool.SetPeerStatus(peer.id, peer.height) + pool.SetPeerHeight(peer.id, peer.height) } }() - lastSeenBlock := uint(41) + // Start a goroutine to pull blocks + go func() { + for { + if !pool.IsRunning() { + return + } + first, second := pool.PeekTwoBlocks() + if first != nil && second != nil { + pool.PopRequest() + } else { + time.Sleep(1 * time.Second) + } + } + }() // Pull from channels for { @@ -53,21 +61,15 @@ func TestBasic(t *testing.T) { t.Errorf("timeout: %v", peerId) case request := <-requestsCh: log.Debug("TEST: Pulled new BlockRequest", "request", request) - // After a while, pretend like we got a block from the peer. + if request.Height == 300 { + return // Done! + } + // Request desired, pretend like we got the block immediately. go func() { block := &types.Block{Header: &types.Header{Height: request.Height}} pool.AddBlock(block, request.PeerId) log.Debug("TEST: Added block", "block", request.Height, "peer", request.PeerId) }() - case block := <-blocksCh: - log.Debug("TEST: Pulled new Block", "height", block.Height) - if block.Height != lastSeenBlock+1 { - t.Fatalf("Wrong order of blocks seen. Expected: %v Got: %v", lastSeenBlock+1, block.Height) - } - lastSeenBlock++ - if block.Height == maxHeight { - return // Done! - } } } @@ -75,43 +77,52 @@ func TestBasic(t *testing.T) { } func TestTimeout(t *testing.T) { - origRequestTimeoutSeconds := requestTimeoutSeconds - requestTimeoutSeconds = time.Duration(0) - defer func() { requestTimeoutSeconds = origRequestTimeoutSeconds }() - - peers := makePeers(100, 0, 1000) + peers := makePeers(10, 0, 1000) start := uint(42) - timeoutsCh := make(chan string, 10) - requestsCh := make(chan BlockRequest, 10) - blocksCh := make(chan *types.Block, 100) - - pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) + timeoutsCh := make(chan string, 100) + requestsCh := make(chan BlockRequest, 100) + pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() // Introduce each peer. go func() { for _, peer := range peers { - pool.SetPeerStatus(peer.id, peer.height) + pool.SetPeerHeight(peer.id, peer.height) + } + }() + + // Start a goroutine to pull blocks + go func() { + for { + if !pool.IsRunning() { + return + } + first, second := pool.PeekTwoBlocks() + if first != nil && second != nil { + pool.PopRequest() + } else { + time.Sleep(1 * time.Second) + } } }() // Pull from channels + counter := 0 + timedOut := map[string]struct{}{} for { select { case peerId := <-timeoutsCh: - // Timed out. Done! - if peers[peerId].id != peerId { - t.Errorf("Unexpected peer from timeoutsCh") + log.Debug("Timeout", "peerId", peerId) + if _, ok := timedOut[peerId]; !ok { + counter++ + if counter == len(peers) { + return // Done! + } } - return - case _ = <-requestsCh: - // Don't do anything, let it time out. - case _ = <-blocksCh: - t.Errorf("Got block when none expected") - return + case request := <-requestsCh: + log.Debug("TEST: Pulled new BlockRequest", "request", request) } } pool.Stop() - } From 0237d284cc673cd5a37d3022c920b84a868b986c Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 24 Mar 2015 12:00:27 -0700 Subject: [PATCH 03/15] Channel bytes are spelled fully, "XXXChannel" --- consensus/pol.go | 5 +++++ consensus/reactor.go | 42 +++++++++++++++++++++--------------------- consensus/state.go | 4 ++-- mempool/reactor.go | 8 ++++---- p2p/connection.go | 26 +++++++++++++++++++------- p2p/peer_set.go | 6 ++++++ p2p/pex_reactor.go | 8 ++++---- p2p/switch.go | 4 ++-- state/state_test.go | 6 +++--- types/block.go | 4 ++++ 10 files changed, 70 insertions(+), 43 deletions(-) diff --git a/consensus/pol.go b/consensus/pol.go index c87b4ee5d..06784d588 100644 --- a/consensus/pol.go +++ b/consensus/pol.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/tendermint/tendermint/account" + "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -94,3 +95,7 @@ func (pol *POL) StringShort() string { Fingerprint(pol.BlockHash), pol.BlockParts) } } + +func (pol *POL) MakePartSet() *types.PartSet { + return types.NewPartSetFromData(binary.BinaryBytes(pol)) +} diff --git a/consensus/reactor.go b/consensus/reactor.go index 83c244995..6e6f33662 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -17,9 +17,9 @@ import ( ) const ( - StateCh = byte(0x20) - DataCh = byte(0x21) - VoteCh = byte(0x22) + StateChannel = byte(0x20) + DataChannel = byte(0x21) + VoteChannel = byte(0x22) peerStateKey = "ConsensusReactor.peerState" @@ -75,15 +75,15 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // TODO optimize return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: StateCh, + Id: StateChannel, Priority: 5, }, &p2p.ChannelDescriptor{ - Id: DataCh, + Id: DataChannel, Priority: 5, }, &p2p.ChannelDescriptor{ - Id: VoteCh, + Id: VoteChannel, Priority: 5, }, } @@ -122,7 +122,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes) switch chId { - case StateCh: + case StateChannel: switch msg := msg_.(type) { case *NewRoundStepMessage: ps.ApplyNewRoundStepMessage(msg, rs) @@ -134,7 +134,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte // Ignore unknown message } - case DataCh: + case DataChannel: switch msg := msg_.(type) { case *Proposal: ps.SetHasProposal(msg) @@ -155,7 +155,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte // Ignore unknown message } - case VoteCh: + case VoteChannel: switch msg := msg_.(type) { case *VoteMessage: vote := msg.Vote @@ -192,7 +192,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte Type: vote.Type, Index: index, } - conR.sw.Broadcast(StateCh, msg) + conR.sw.Broadcast(StateChannel, msg) } default: @@ -252,10 +252,10 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { - conR.sw.Broadcast(StateCh, nrsMsg) + conR.sw.Broadcast(StateChannel, nrsMsg) } if csMsg != nil { - conR.sw.Broadcast(StateCh, csMsg) + conR.sw.Broadcast(StateChannel, csMsg) } } } @@ -264,10 +264,10 @@ func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) { rs := conR.conS.GetRoundState() nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { - peer.Send(StateCh, nrsMsg) + peer.Send(StateChannel, nrsMsg) } if csMsg != nil { - peer.Send(StateCh, nrsMsg) + peer.Send(StateChannel, nrsMsg) } } @@ -296,7 +296,7 @@ OUTER_LOOP: Type: partTypeProposalBlock, Part: part, } - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposalBlockPart(rs.Height, rs.Round, index) continue OUTER_LOOP } @@ -306,7 +306,7 @@ OUTER_LOOP: if 0 < prs.Height && prs.Height < rs.Height { //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray) if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok { - // Ensure that the peer's PartSetHeaeder is correct + // Ensure that the peer's PartSetHeader is correct blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) if !blockMeta.Parts.Equals(prs.ProposalBlockParts) { log.Debug("Peer ProposalBlockParts mismatch, sleeping", @@ -329,7 +329,7 @@ OUTER_LOOP: Type: partTypeProposalBlock, Part: part, } - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) continue OUTER_LOOP } else { @@ -349,7 +349,7 @@ OUTER_LOOP: // Send proposal? if rs.Proposal != nil && !prs.Proposal { msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal} - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposal(rs.Proposal) continue OUTER_LOOP } @@ -363,7 +363,7 @@ OUTER_LOOP: Type: partTypeProposalPOL, Part: rs.ProposalPOLParts.GetPart(index), } - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposalPOLPart(rs.Height, rs.Round, index) continue OUTER_LOOP } @@ -397,7 +397,7 @@ OUTER_LOOP: vote := voteSet.GetByIndex(index) // NOTE: vote may be a commit. msg := &VoteMessage{index, vote} - peer.Send(VoteCh, msg) + peer.Send(VoteChannel, msg) ps.SetHasVote(vote, index) return true } @@ -421,7 +421,7 @@ OUTER_LOOP: Signature: commit.Signature, } msg := &VoteMessage{index, vote} - peer.Send(VoteCh, msg) + peer.Send(VoteChannel, msg) ps.SetHasVote(vote, index) return true } diff --git a/consensus/state.go b/consensus/state.go index d44dd7b5a..5fb4268d8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -641,12 +641,12 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) { return } - blockParts = types.NewPartSetFromData(binary.BinaryBytes(block)) + blockParts = block.MakePartSet() pol = cs.LockedPOL // If exists, is a PoUnlock. } if pol != nil { - polParts = types.NewPartSetFromData(binary.BinaryBytes(pol)) + polParts = pol.MakePartSet() } // Make proposal diff --git a/mempool/reactor.go b/mempool/reactor.go index 5bed4e18b..e16cf9332 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -11,7 +11,7 @@ import ( ) var ( - MempoolCh = byte(0x30) + MempoolChannel = byte(0x30) ) // MempoolReactor handles mempool tx broadcasting amongst peers. @@ -52,7 +52,7 @@ func (memR *MempoolReactor) Stop() { func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: MempoolCh, + Id: MempoolChannel, Priority: 5, }, } @@ -92,7 +92,7 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { if peer.Key == src.Key { continue } - peer.TrySend(MempoolCh, msg) + peer.TrySend(MempoolChannel, msg) } default: @@ -106,7 +106,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { return err } msg := &TxMessage{Tx: tx} - memR.sw.Broadcast(MempoolCh, msg) + memR.sw.Broadcast(MempoolChannel, msg) return nil } diff --git a/p2p/connection.go b/p2p/connection.go index 89086bc2a..578dea1b3 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -50,8 +50,9 @@ There are two methods for sending messages: func (m MConnection) TrySend(chId byte, msg interface{}) bool {} `Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued -for the channel with the given id byte `chId`. The message `msg` is serialized -using the `tendermint/binary` submodule's `WriteBinary()` reflection routine. +for the channel with the given id byte `chId`, or until the request times out. +The message `msg` is serialized using the `tendermint/binary` submodule's +`WriteBinary()` reflection routine. `TrySend(chId, msg)` is a nonblocking call that returns false if the channel's queue is full. @@ -437,8 +438,19 @@ FOR_LOOP: //----------------------------------------------------------------------------- type ChannelDescriptor struct { - Id byte - Priority uint + Id byte + Priority uint + SendQueueCapacity uint + RecvBufferCapacity uint +} + +func (chDesc *ChannelDescriptor) FillDefaults() { + if chDesc.SendQueueCapacity == 0 { + chDesc.SendQueueCapacity = defaultSendQueueCapacity + } + if chDesc.RecvBufferCapacity == 0 { + chDesc.RecvBufferCapacity = defaultRecvBufferCapacity + } } // TODO: lowercase. @@ -448,7 +460,7 @@ type Channel struct { desc *ChannelDescriptor id byte sendQueue chan []byte - sendQueueSize uint32 + sendQueueSize uint32 // atomic. recving []byte sending []byte priority uint @@ -463,8 +475,8 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { conn: conn, desc: desc, id: desc.Id, - sendQueue: make(chan []byte, defaultSendQueueCapacity), - recving: make([]byte, 0, defaultRecvBufferCapacity), + sendQueue: make(chan []byte, desc.SendQueueCapacity), + recving: make([]byte, 0, desc.RecvBufferCapacity), priority: desc.Priority, } } diff --git a/p2p/peer_set.go b/p2p/peer_set.go index b4230ffa3..f365cd8ea 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -55,6 +55,12 @@ func (ps *PeerSet) Has(peerKey string) bool { return ok } +func (ps *PeerSet) Get(peerKey string) *Peer { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return ps.lookup[peerKey].peer +} + func (ps *PeerSet) Remove(peer *Peer) { ps.mtx.Lock() defer ps.mtx.Unlock() diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 90be9b24c..de742645f 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -14,7 +14,7 @@ import ( var pexErrInvalidMessage = errors.New("Invalid PEX message") const ( - PexCh = byte(0x00) + PexChannel = byte(0x00) ensurePeersPeriodSeconds = 30 minNumOutboundPeers = 10 maxNumPeers = 50 @@ -62,7 +62,7 @@ func (pexR *PEXReactor) Stop() { func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ &ChannelDescriptor{ - Id: PexCh, + Id: PexChannel, Priority: 1, }, } @@ -122,11 +122,11 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { // Asks peer for more addresses. func (pexR *PEXReactor) RequestPEX(peer *Peer) { - peer.Send(PexCh, &pexRequestMessage{}) + peer.Send(PexChannel, &pexRequestMessage{}) } func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { - peer.Send(PexCh, &pexAddrsMessage{Addrs: addrs}) + peer.Send(PexChannel, &pexAddrsMessage{Addrs: addrs}) } // Ensures that sufficient peers are connected. (continuous) diff --git a/p2p/switch.go b/p2p/switch.go index 635e0ecaa..12267fa10 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -133,7 +133,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er // Send handshake msg := &pexHandshakeMessage{ChainId: sw.chainId} - peer.Send(PexCh, msg) + peer.Send(PexChannel, msg) return peer, nil } @@ -164,7 +164,7 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { return sw.dialing.Has(addr.String()) } -// Broadcast runs a go routine for each attemptted send, which will block +// Broadcast runs a go routine for each attempted send, which will block // trying to send for defaultSendTimeoutSeconds. Returns a channel // which receives success values for each attempted send (false if times out) func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool { diff --git a/state/state_test.go b/state/state_test.go index da7960a77..71efd80f2 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -91,7 +91,7 @@ func TestGenesisSaveLoad(t *testing.T) { // Make complete block and blockParts block := makeBlock(t, s0, nil, nil) - blockParts := types.NewPartSetFromData(binary.BinaryBytes(block)) + blockParts := block.MakePartSet() // Now append the block to s0. err := s0.AppendBlock(block, blockParts.Header()) @@ -338,7 +338,7 @@ func TestAddValidator(t *testing.T) { // Make complete block and blockParts block0 := makeBlock(t, s0, nil, []types.Tx{bondTx}) - block0Parts := types.NewPartSetFromData(binary.BinaryBytes(block0)) + block0Parts := block0.MakePartSet() // Sanity check if s0.BondedValidators.Size() != 1 { @@ -379,7 +379,7 @@ func TestAddValidator(t *testing.T) { }, }, nil, ) - block1Parts := types.NewPartSetFromData(binary.BinaryBytes(block1)) + block1Parts := block1.MakePartSet() err = s0.AppendBlock(block1, block1Parts.Header()) if err != nil { t.Error("Error appending secondary block:", err) diff --git a/types/block.go b/types/block.go index d56f4e467..11dfb3b9d 100644 --- a/types/block.go +++ b/types/block.go @@ -66,6 +66,10 @@ func (b *Block) Hash() []byte { return merkle.HashFromHashes(hashes) } +func (b *Block) MakePartSet() *PartSet { + return NewPartSetFromData(binary.BinaryBytes(b)) +} + // Convenience. // A nil block never hashes to anything. // Nothing hashes to a nil hash. From 612f8bab9d829d5eeabfe52af47327cb2deee0e9 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 24 Mar 2015 17:54:09 -0700 Subject: [PATCH 04/15] Fixed RepeatTimer race condition --- common/repeat_timer.go | 61 ++++++---- types/store.go | 247 ----------------------------------------- 2 files changed, 41 insertions(+), 267 deletions(-) delete mode 100644 types/store.go diff --git a/common/repeat_timer.go b/common/repeat_timer.go index de9b71fae..e2a5e1834 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -1,44 +1,65 @@ package common import "time" +import "sync" /* RepeatTimer repeatedly sends a struct{}{} to .Ch after each "dur" period. It's good for keeping connections alive. */ type RepeatTimer struct { - Name string - Ch chan struct{} - quit chan struct{} - dur time.Duration - timer *time.Timer + Ch chan time.Time + + mtx sync.Mutex + name string + ticker *time.Ticker + quit chan struct{} + dur time.Duration } func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer { - var ch = make(chan struct{}) - var quit = make(chan struct{}) - var t = &RepeatTimer{Name: name, Ch: ch, dur: dur, quit: quit} - t.timer = time.AfterFunc(dur, t.fireRoutine) + var t = &RepeatTimer{ + Ch: make(chan time.Time), + ticker: time.NewTicker(dur), + quit: make(chan struct{}), + name: name, + dur: dur, + } + go t.fireRoutine(t.ticker) return t } -func (t *RepeatTimer) fireRoutine() { - select { - case t.Ch <- struct{}{}: - t.timer.Reset(t.dur) - case <-t.quit: - // do nothing - default: - t.timer.Reset(t.dur) +func (t *RepeatTimer) fireRoutine(ticker *time.Ticker) { + for { + select { + case t_ := <-ticker.C: + t.Ch <- t_ + case <-t.quit: + return + } } } // Wait the duration again before firing. func (t *RepeatTimer) Reset() { - t.timer.Reset(t.dur) + t.mtx.Lock() // Lock + defer t.mtx.Unlock() + + if t.ticker != nil { + t.ticker.Stop() + } + t.ticker = time.NewTicker(t.dur) + go t.fireRoutine(t.ticker) } func (t *RepeatTimer) Stop() bool { - close(t.quit) - return t.timer.Stop() + t.mtx.Lock() // Lock + defer t.mtx.Unlock() + + exists := t.ticker != nil + if exists { + t.ticker.Stop() + t.ticker = nil + } + return exists } diff --git a/types/store.go b/types/store.go deleted file mode 100644 index 3afc8cb58..000000000 --- a/types/store.go +++ /dev/null @@ -1,247 +0,0 @@ -package types - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - - "github.com/tendermint/tendermint/binary" - . "github.com/tendermint/tendermint/common" - dbm "github.com/tendermint/tendermint/db" -) - -/* -Simple low level store for blocks. - -There are three types of information stored: - - BlockMeta: Meta information about each block - - Block part: Parts of each block, aggregated w/ PartSet - - Validation: The Validation part of each block, for gossiping commit votes - -Currently the commit signatures are duplicated in the Block parts as -well as the Validation. In the future this may change, perhaps by moving -the Validation data outside the Block. -*/ -type BlockStore struct { - height uint - db dbm.DB -} - -func NewBlockStore(db dbm.DB) *BlockStore { - bsjson := LoadBlockStoreStateJSON(db) - return &BlockStore{ - height: bsjson.Height, - db: db, - } -} - -// Height() returns the last known contiguous block height. -func (bs *BlockStore) Height() uint { - return bs.height -} - -func (bs *BlockStore) GetReader(key []byte) io.Reader { - bytez := bs.db.Get(key) - if bytez == nil { - return nil - } - return bytes.NewReader(bytez) -} - -func (bs *BlockStore) LoadBlock(height uint) *Block { - var n int64 - var err error - r := bs.GetReader(calcBlockMetaKey(height)) - if r == nil { - panic(Fmt("Block does not exist at height %v", height)) - } - meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta) - if err != nil { - panic(Fmt("Error reading block meta: %v", err)) - } - bytez := []byte{} - for i := uint(0); i < meta.Parts.Total; i++ { - part := bs.LoadBlockPart(height, i) - bytez = append(bytez, part.Bytes...) - } - block := binary.ReadBinary(&Block{}, bytes.NewReader(bytez), &n, &err).(*Block) - if err != nil { - panic(Fmt("Error reading block: %v", err)) - } - return block -} - -func (bs *BlockStore) LoadBlockPart(height uint, index uint) *Part { - var n int64 - var err error - r := bs.GetReader(calcBlockPartKey(height, index)) - if r == nil { - panic(Fmt("BlockPart does not exist for height %v index %v", height, index)) - } - part := binary.ReadBinary(&Part{}, r, &n, &err).(*Part) - if err != nil { - panic(Fmt("Error reading block part: %v", err)) - } - return part -} - -func (bs *BlockStore) LoadBlockMeta(height uint) *BlockMeta { - var n int64 - var err error - r := bs.GetReader(calcBlockMetaKey(height)) - if r == nil { - panic(Fmt("BlockMeta does not exist for height %v", height)) - } - meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta) - if err != nil { - panic(Fmt("Error reading block meta: %v", err)) - } - return meta -} - -// NOTE: the Commit-vote heights are for the block at `height-1` -// Since these are included in the subsequent block, the height -// is off by 1. -func (bs *BlockStore) LoadBlockValidation(height uint) *Validation { - var n int64 - var err error - r := bs.GetReader(calcBlockValidationKey(height)) - if r == nil { - panic(Fmt("BlockValidation does not exist for height %v", height)) - } - validation := binary.ReadBinary(&Validation{}, r, &n, &err).(*Validation) - if err != nil { - panic(Fmt("Error reading validation: %v", err)) - } - return validation -} - -// NOTE: the Commit-vote heights are for the block at `height` -func (bs *BlockStore) LoadSeenValidation(height uint) *Validation { - var n int64 - var err error - r := bs.GetReader(calcSeenValidationKey(height)) - if r == nil { - panic(Fmt("SeenValidation does not exist for height %v", height)) - } - validation := binary.ReadBinary(&Validation{}, r, &n, &err).(*Validation) - if err != nil { - panic(Fmt("Error reading validation: %v", err)) - } - return validation -} - -// blockParts: Must be parts of the block -// seenValidation: The +2/3 commits that were seen which finalized the height. -// If all the nodes restart after committing a block, -// we need this to reload the commits to catch-up nodes to the -// most recent height. Otherwise they'd stall at H-1. -// Also good to have to debug consensus issues & punish wrong-signers -// whose commits weren't included in the block. -func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet, seenValidation *Validation) { - height := block.Height - if height != bs.height+1 { - panic(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height)) - } - if !blockParts.IsComplete() { - panic(Fmt("BlockStore can only save complete block part sets")) - } - - // Save block meta - meta := makeBlockMeta(block, blockParts) - metaBytes := binary.BinaryBytes(meta) - bs.db.Set(calcBlockMetaKey(height), metaBytes) - - // Save block parts - for i := uint(0); i < blockParts.Total(); i++ { - bs.saveBlockPart(height, i, blockParts.GetPart(i)) - } - - // Save block validation (duplicate and separate from the Block) - blockValidationBytes := binary.BinaryBytes(block.Validation) - bs.db.Set(calcBlockValidationKey(height), blockValidationBytes) - - // Save seen validation (seen +2/3 commits) - seenValidationBytes := binary.BinaryBytes(seenValidation) - bs.db.Set(calcSeenValidationKey(height), seenValidationBytes) - - // Save new BlockStoreStateJSON descriptor - BlockStoreStateJSON{Height: height}.Save(bs.db) - - // Done! - bs.height = height -} - -func (bs *BlockStore) saveBlockPart(height uint, index uint, part *Part) { - if height != bs.height+1 { - panic(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height)) - } - partBytes := binary.BinaryBytes(part) - bs.db.Set(calcBlockPartKey(height, index), partBytes) -} - -//----------------------------------------------------------------------------- - -type BlockMeta struct { - Hash []byte // The block hash - Header *Header // The block's Header - Parts PartSetHeader // The PartSetHeader, for transfer -} - -func makeBlockMeta(block *Block, blockParts *PartSet) *BlockMeta { - return &BlockMeta{ - Hash: block.Hash(), - Header: block.Header, - Parts: blockParts.Header(), - } -} - -//----------------------------------------------------------------------------- - -func calcBlockMetaKey(height uint) []byte { - return []byte(fmt.Sprintf("H:%v", height)) -} - -func calcBlockPartKey(height uint, partIndex uint) []byte { - return []byte(fmt.Sprintf("P:%v:%v", height, partIndex)) -} - -func calcBlockValidationKey(height uint) []byte { - return []byte(fmt.Sprintf("V:%v", height)) -} - -func calcSeenValidationKey(height uint) []byte { - return []byte(fmt.Sprintf("SV:%v", height)) -} - -//----------------------------------------------------------------------------- - -var blockStoreKey = []byte("blockStore") - -type BlockStoreStateJSON struct { - Height uint -} - -func (bsj BlockStoreStateJSON) Save(db dbm.DB) { - bytes, err := json.Marshal(bsj) - if err != nil { - panic(Fmt("Could not marshal state bytes: %v", err)) - } - db.Set(blockStoreKey, bytes) -} - -func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON { - bytes := db.Get(blockStoreKey) - if bytes == nil { - return BlockStoreStateJSON{ - Height: 0, - } - } - bsj := BlockStoreStateJSON{} - err := json.Unmarshal(bytes, &bsj) - if err != nil { - panic(Fmt("Could not unmarshal bytes: %X", bytes)) - } - return bsj -} From 08a83aa9fbb08d85543710a5c88c7cc7d58bf230 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 00:15:18 -0700 Subject: [PATCH 05/15] Reactors can be stopped or started at any time. --- blockchain/pool.go | 2 +- blockchain/reactor.go | 226 ++++++++++++++++++++++++++++++++++++++++++ blockchain/store.go | 24 +---- consensus/reactor.go | 5 +- consensus/state.go | 5 +- consensus/test.go | 4 +- daemon/daemon.go | 23 +++-- p2p/connection.go | 1 + p2p/peer_set.go | 1 + p2p/switch.go | 123 ++++++++++++----------- rpc/rpc.go | 6 +- state/state_test.go | 1 - types/block_meta.go | 15 +++ 13 files changed, 340 insertions(+), 96 deletions(-) create mode 100644 blockchain/reactor.go create mode 100644 types/block_meta.go diff --git a/blockchain/pool.go b/blockchain/pool.go index 3e3313bfb..099594c12 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -149,7 +149,7 @@ func (bp *BlockPool) RedoRequest(height uint) { if request.block == nil { panic("Expected block to be non-nil") } - bp.removePeer(request.peerId) + bp.RemovePeer(request.peerId) // Lock on peersMtx. request.block = nil request.peerId = "" bp.numPending++ diff --git a/blockchain/reactor.go b/blockchain/reactor.go new file mode 100644 index 000000000..1aba782ee --- /dev/null +++ b/blockchain/reactor.go @@ -0,0 +1,226 @@ +package blockchain + +import ( + "bytes" + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" +) + +const ( + BlockchainChannel = byte(0x40) + defaultChannelCapacity = 100 + defaultSleepIntervalMS = 500 +) + +// BlockchainReactor handles long-term catchup syncing. +type BlockchainReactor struct { + sw *p2p.Switch + store *BlockStore + pool *BlockPool + requestsCh chan BlockRequest + timeoutsCh chan string + lastBlock *types.Block + quit chan struct{} + started uint32 + stopped uint32 +} + +func NewBlockchainReactor(store *BlockStore) *BlockchainReactor { + requestsCh := make(chan BlockRequest, defaultChannelCapacity) + timeoutsCh := make(chan string, defaultChannelCapacity) + pool := NewBlockPool( + store.Height()+1, + requestsCh, + timeoutsCh, + ) + bcR := &BlockchainReactor{ + store: store, + pool: pool, + requestsCh: requestsCh, + timeoutsCh: timeoutsCh, + quit: make(chan struct{}), + started: 0, + stopped: 0, + } + return bcR +} + +// Implements Reactor +func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { + if atomic.CompareAndSwapUint32(&bcR.started, 0, 1) { + log.Info("Starting BlockchainReactor") + bcR.sw = sw + bcR.pool.Start() + go bcR.poolRoutine() + } +} + +// Implements Reactor +func (bcR *BlockchainReactor) Stop() { + if atomic.CompareAndSwapUint32(&bcR.stopped, 0, 1) { + log.Info("Stopping BlockchainReactor") + close(bcR.quit) + bcR.pool.Stop() + } +} + +// Implements Reactor +func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + &p2p.ChannelDescriptor{ + Id: BlockchainChannel, + Priority: 5, + SendQueueCapacity: 20, // Queue 20 blocks to send to a peer. + }, + } +} + +// Implements Reactor +func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { + // Send peer our state. + peer.Send(BlockchainChannel, PeerStatusMessage{bcR.store.Height()}) +} + +// Implements Reactor +func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + // Remove peer from the pool. + bcR.pool.RemovePeer(peer.Key) +} + +// Implements Reactor +func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { + _, msg_, err := DecodeMessage(msgBytes) + if err != nil { + log.Warn("Error decoding message", "error", err) + return + } + log.Info("BlockchainReactor received message", "msg", msg_) + + switch msg := msg_.(type) { + case BlockRequestMessage: + log.Debug("Got BlockRequest", "msg", msg) + // Got a request for a block. Respond with block if we have it. + block := bcR.store.LoadBlock(msg.Height) + if block != nil { + msg := BlockResponseMessage{Block: block} + queued := src.TrySend(BlockchainChannel, msg) + if !queued { + // queue is full, just ignore. + } + } else { + // TODO peer is asking for things we don't have. + } + case BlockResponseMessage: + log.Debug("Got BlockResponse", "msg", msg) + // Got a block. + bcR.pool.AddBlock(msg.Block, src.Key) + case PeerStatusMessage: + log.Debug("Got PeerStatus", "msg", msg) + // Got a peer status. + bcR.pool.SetPeerHeight(src.Key, msg.Height) + default: + // Ignore unknown message + } +} + +func (bcR *BlockchainReactor) poolRoutine() { +FOR_LOOP: + for { + select { + case request := <-bcR.requestsCh: // chan BlockRequest + peer := bcR.sw.Peers().Get(request.PeerId) + if peer == nil { + // We can't fulfill the request. + continue FOR_LOOP + } + msg := BlockRequestMessage{request.Height} + queued := peer.TrySend(BlockchainChannel, msg) + if !queued { + // We couldn't queue the request. + time.Sleep(defaultSleepIntervalMS * time.Millisecond) + continue FOR_LOOP + } + case peerId := <-bcR.timeoutsCh: // chan string + // Peer timed out. + peer := bcR.sw.Peers().Get(peerId) + bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) + case <-bcR.quit: + break FOR_LOOP + } + } +} + +func (bcR *BlockchainReactor) BroadcastStatus() error { + bcR.sw.Broadcast(BlockchainChannel, PeerStatusMessage{bcR.store.Height()}) + return nil +} + +//----------------------------------------------------------------------------- +// Messages + +const ( + msgTypeUnknown = byte(0x00) + msgTypeBlockRequest = byte(0x10) + msgTypeBlockResponse = byte(0x11) + msgTypePeerStatus = byte(0x20) +) + +// TODO: check for unnecessary extra bytes at the end. +func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) { + n := new(int64) + msgType = bz[0] + r := bytes.NewReader(bz) + switch msgType { + case msgTypeBlockRequest: + msg = binary.ReadBinary(BlockRequestMessage{}, r, n, &err) + case msgTypeBlockResponse: + msg = binary.ReadBinary(BlockResponseMessage{}, r, n, &err) + case msgTypePeerStatus: + msg = binary.ReadBinary(PeerStatusMessage{}, r, n, &err) + default: + msg = nil + } + return +} + +//------------------------------------- + +type BlockRequestMessage struct { + Height uint +} + +func (m BlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest } + +func (m BlockRequestMessage) String() string { + return fmt.Sprintf("[BlockRequestMessage %v]", m.Height) +} + +//------------------------------------- + +type BlockResponseMessage struct { + Block *types.Block +} + +func (m BlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse } + +func (m BlockResponseMessage) String() string { + return fmt.Sprintf("[BlockResponseMessage %v]", m.Block.Height) +} + +//------------------------------------- + +type PeerStatusMessage struct { + Height uint +} + +func (m PeerStatusMessage) TypeByte() byte { return msgTypePeerStatus } + +func (m PeerStatusMessage) String() string { + return fmt.Sprintf("[PeerStatusMessage %v]", m.Height) +} diff --git a/blockchain/store.go b/blockchain/store.go index 8938273cb..f9d54cd23 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -57,7 +57,7 @@ func (bs *BlockStore) LoadBlock(height uint) *types.Block { if r == nil { panic(Fmt("Block does not exist at height %v", height)) } - meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta) + meta := binary.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta) if err != nil { panic(Fmt("Error reading block meta: %v", err)) } @@ -87,14 +87,14 @@ func (bs *BlockStore) LoadBlockPart(height uint, index uint) *types.Part { return part } -func (bs *BlockStore) LoadBlockMeta(height uint) *BlockMeta { +func (bs *BlockStore) LoadBlockMeta(height uint) *types.BlockMeta { var n int64 var err error r := bs.GetReader(calcBlockMetaKey(height)) if r == nil { panic(Fmt("BlockMeta does not exist for height %v", height)) } - meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta) + meta := binary.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta) if err != nil { panic(Fmt("Error reading block meta: %v", err)) } @@ -150,7 +150,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s } // Save block meta - meta := makeBlockMeta(block, blockParts) + meta := types.NewBlockMeta(block, blockParts) metaBytes := binary.BinaryBytes(meta) bs.db.Set(calcBlockMetaKey(height), metaBytes) @@ -184,22 +184,6 @@ func (bs *BlockStore) saveBlockPart(height uint, index uint, part *types.Part) { //----------------------------------------------------------------------------- -type BlockMeta struct { - Hash []byte // The block hash - Header *types.Header // The block's Header - Parts types.PartSetHeader // The PartSetHeader, for transfer -} - -func makeBlockMeta(block *types.Block, blockParts *types.PartSet) *BlockMeta { - return &BlockMeta{ - Hash: block.Hash(), - Header: block.Header, - Parts: blockParts.Header(), - } -} - -//----------------------------------------------------------------------------- - func calcBlockMetaKey(height uint) []byte { return []byte(fmt.Sprintf("H:%v", height)) } diff --git a/consensus/reactor.go b/consensus/reactor.go index 6e6f33662..7abd75ec1 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,6 +9,7 @@ import ( "time" "github.com/tendermint/tendermint/binary" + bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" @@ -34,11 +35,11 @@ type ConsensusReactor struct { stopped uint32 quit chan struct{} - blockStore *types.BlockStore + blockStore *bc.BlockStore conS *ConsensusState } -func NewConsensusReactor(consensusState *ConsensusState, blockStore *types.BlockStore) *ConsensusReactor { +func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor { conR := &ConsensusReactor{ blockStore: blockStore, quit: make(chan struct{}), diff --git a/consensus/state.go b/consensus/state.go index 5fb4268d8..ad8b79cf7 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -62,6 +62,7 @@ import ( "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" + bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" . "github.com/tendermint/tendermint/consensus/types" @@ -234,7 +235,7 @@ type ConsensusState struct { stopped uint32 quit chan struct{} - blockStore *types.BlockStore + blockStore *bc.BlockStore mempoolReactor *mempl.MempoolReactor runActionCh chan RoundAction newStepCh chan *RoundState @@ -247,7 +248,7 @@ type ConsensusState struct { lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on. } -func NewConsensusState(state *sm.State, blockStore *types.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { +func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { cs := &ConsensusState{ quit: make(chan struct{}), blockStore: blockStore, diff --git a/consensus/test.go b/consensus/test.go index e86c6a075..397befa0d 100644 --- a/consensus/test.go +++ b/consensus/test.go @@ -3,15 +3,15 @@ package consensus import ( "sort" + bc "github.com/tendermint/tendermint/blockchain" dbm "github.com/tendermint/tendermint/db" mempl "github.com/tendermint/tendermint/mempool" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/types" ) func randConsensusState() (*ConsensusState, []*sm.PrivValidator) { state, _, privValidators := sm.RandGenesisState(20, false, 1000, 10, false, 1000) - blockStore := types.NewBlockStore(dbm.NewMemDB()) + blockStore := bc.NewBlockStore(dbm.NewMemDB()) mempool := mempl.NewMempool(state) mempoolReactor := mempl.NewMempoolReactor(mempool) cs := NewConsensusState(state, blockStore, mempoolReactor) diff --git a/daemon/daemon.go b/daemon/daemon.go index ccb2932de..89e5de697 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -4,6 +4,7 @@ import ( "os" "os/signal" + bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" @@ -12,15 +13,15 @@ import ( "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/rpc" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/types" ) type Node struct { lz []p2p.Listener sw *p2p.Switch book *p2p.AddrBook + blockStore *bc.BlockStore pexReactor *p2p.PEXReactor - blockStore *types.BlockStore + bcReactor *bc.BlockchainReactor mempoolReactor *mempl.MempoolReactor consensusState *consensus.ConsensusState consensusReactor *consensus.ConsensusReactor @@ -30,7 +31,7 @@ type Node struct { func NewNode() *Node { // Get BlockStore blockStoreDB := dbm.GetDB("blockstore") - blockStore := types.NewBlockStore(blockStoreDB) + blockStore := bc.NewBlockStore(blockStoreDB) // Get State stateDB := dbm.GetDB("state") @@ -53,6 +54,9 @@ func NewNode() *Node { book := p2p.NewAddrBook(config.App().GetString("AddrBookFile")) pexReactor := p2p.NewPEXReactor(book) + // Get BlockchainReactor + bcReactor := bc.NewBlockchainReactor(blockStore) + // Get MempoolReactor mempool := mempl.NewMempool(state.Copy()) mempoolReactor := mempl.NewMempoolReactor(mempool) @@ -64,14 +68,19 @@ func NewNode() *Node { consensusReactor.SetPrivValidator(privValidator) } - sw := p2p.NewSwitch([]p2p.Reactor{pexReactor, mempoolReactor, consensusReactor}) + sw := p2p.NewSwitch() sw.SetChainId(state.Hash(), config.App().GetString("Network")) + sw.AddReactor("PEX", pexReactor) + //sw.AddReactor("BLOCKCHAIN", bcReactor) + sw.AddReactor("MEMPOOL", mempoolReactor) + sw.AddReactor("CONSENSUS", consensusReactor) return &Node{ sw: sw, book: book, - pexReactor: pexReactor, blockStore: blockStore, + pexReactor: pexReactor, + bcReactor: bcReactor, mempoolReactor: mempoolReactor, consensusState: consensusState, consensusReactor: consensusReactor, @@ -85,13 +94,13 @@ func (n *Node) Start() { go n.inboundConnectionRoutine(l) } n.book.Start() - n.sw.Start() + n.sw.StartAll() } func (n *Node) Stop() { log.Info("Stopping Node") // TODO: gracefully disconnect from peers. - n.sw.Stop() + n.sw.StopAll() n.book.Stop() } diff --git a/p2p/connection.go b/p2p/connection.go index 578dea1b3..0e26480f1 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -468,6 +468,7 @@ type Channel struct { } func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { + desc.FillDefaults() if desc.Priority <= 0 { panic("Channel default priority must be a postive integer") } diff --git a/p2p/peer_set.go b/p2p/peer_set.go index f365cd8ea..23f49c513 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -7,6 +7,7 @@ import ( // IPeerSet has a (immutable) subset of the methods of PeerSet. type IPeerSet interface { Has(key string) bool + Get(key string) *Peer List() []*Peer Size() int } diff --git a/p2p/switch.go b/p2p/switch.go index 12267fa10..27c61cb46 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -29,89 +29,100 @@ or more `Channels`. So while sending outgoing messages is typically performed o incoming messages are received on the reactor. */ type Switch struct { - reactors []Reactor + chainId string + reactors map[string]Reactor chDescs []*ChannelDescriptor reactorsByCh map[byte]Reactor peers *PeerSet dialing *CMap - listeners *CMap // listenerName -> chan interface{} - quit chan struct{} - started uint32 - stopped uint32 - chainId string + listeners *CMap // listenerName -> chan interface{} + running uint32 // atomic } var ( - ErrSwitchStopped = errors.New("Switch already stopped") ErrSwitchDuplicatePeer = errors.New("Duplicate peer") + ErrSwitchStopped = errors.New("Switch stopped") ) const ( peerDialTimeoutSeconds = 3 ) -func NewSwitch(reactors []Reactor) *Switch { - - // Validate the reactors. no two reactors can share the same channel. - chDescs := []*ChannelDescriptor{} - reactorsByCh := make(map[byte]Reactor) - for _, reactor := range reactors { - reactorChannels := reactor.GetChannels() - for _, chDesc := range reactorChannels { - chId := chDesc.Id - if reactorsByCh[chId] != nil { - panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, reactorsByCh[chId], reactor)) - } - chDescs = append(chDescs, chDesc) - reactorsByCh[chId] = reactor - } - } +func NewSwitch() *Switch { sw := &Switch{ - reactors: reactors, - chDescs: chDescs, - reactorsByCh: reactorsByCh, + chainId: "", + reactors: make(map[string]Reactor), + chDescs: make([]*ChannelDescriptor, 0), + reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: NewCMap(), listeners: NewCMap(), - quit: make(chan struct{}), - stopped: 0, + running: 0, } return sw } -func (sw *Switch) Start() { - if atomic.CompareAndSwapUint32(&sw.started, 0, 1) { - log.Info("Starting Switch") - for _, reactor := range sw.reactors { - reactor.Start(sw) +func (sw *Switch) SetChainId(hash []byte, network string) { + sw.chainId = hex.EncodeToString(hash) + "-" + network +} + +func (sw *Switch) AddReactor(name string, reactor Reactor) { + // Validate the reactor. + // No two reactors can share the same channel. + reactorChannels := reactor.GetChannels() + for _, chDesc := range reactorChannels { + chId := chDesc.Id + if sw.reactorsByCh[chId] != nil { + panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, sw.reactorsByCh[chId], reactor)) } + sw.chDescs = append(sw.chDescs, chDesc) + sw.reactorsByCh[chId] = reactor + } + sw.reactors[name] = reactor + time.Sleep(1 * time.Second) +} + +func (sw *Switch) StartReactor(name string) { + atomic.StoreUint32(&sw.running, 1) + sw.reactors[name].Start(sw) +} + +// Convenience function +func (sw *Switch) StartAll() { + atomic.StoreUint32(&sw.running, 1) + for _, reactor := range sw.reactors { + reactor.Start(sw) } } -func (sw *Switch) Stop() { - if atomic.CompareAndSwapUint32(&sw.stopped, 0, 1) { - log.Info("Stopping Switch") - close(sw.quit) - // Stop each peer. - for _, peer := range sw.peers.List() { - peer.stop() - } - sw.peers = NewPeerSet() - // Stop all reactors. - for _, reactor := range sw.reactors { - reactor.Stop() - } +func (sw *Switch) StopReactor(name string) { + sw.reactors[name].Stop() +} + +// Convenience function +// Not goroutine safe +func (sw *Switch) StopAll() { + atomic.StoreUint32(&sw.running, 0) + // Stop each peer. + for _, peer := range sw.peers.List() { + peer.stop() + } + sw.peers = NewPeerSet() + // Stop all reactors. + for _, reactor := range sw.reactors { + reactor.Stop() } } -func (sw *Switch) Reactors() []Reactor { +// Not goroutine safe +func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - if atomic.LoadUint32(&sw.stopped) == 1 { + if atomic.LoadUint32(&sw.running) == 0 { return nil, ErrSwitchStopped } @@ -125,12 +136,12 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er return nil, ErrSwitchDuplicatePeer } - // Start the peer - go peer.start() - // Notify listeners. sw.doAddPeer(peer) + // Start the peer + go peer.start() + // Send handshake msg := &pexHandshakeMessage{ChainId: sw.chainId} peer.Send(PexChannel, msg) @@ -139,7 +150,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er } func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { - if atomic.LoadUint32(&sw.stopped) == 1 { + if atomic.LoadUint32(&sw.running) == 0 { return nil, ErrSwitchStopped } @@ -168,7 +179,7 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { // trying to send for defaultSendTimeoutSeconds. Returns a channel // which receives success values for each attempted send (false if times out) func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool { - if atomic.LoadUint32(&sw.stopped) == 1 { + if atomic.LoadUint32(&sw.running) == 0 { return nil } successChan := make(chan bool, len(sw.peers.List())) @@ -223,16 +234,12 @@ func (sw *Switch) StopPeerGracefully(peer *Peer) { sw.doRemovePeer(peer, nil) } -func (sw *Switch) SetChainId(hash []byte, network string) { - sw.chainId = hex.EncodeToString(hash) + "-" + network -} - func (sw *Switch) IsListening() bool { return sw.listeners.Size() > 0 } func (sw *Switch) doAddPeer(peer *Peer) { - for _, reactor := range sw.reactors { + for name, reactor := range sw.reactors { reactor.AddPeer(peer) } } diff --git a/rpc/rpc.go b/rpc/rpc.go index 94631249c..8cf905d3a 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -1,18 +1,18 @@ package rpc import ( + bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" ) -var blockStore *types.BlockStore +var blockStore *bc.BlockStore var consensusState *consensus.ConsensusState var mempoolReactor *mempl.MempoolReactor var p2pSwitch *p2p.Switch -func SetRPCBlockStore(bs *types.BlockStore) { +func SetRPCBlockStore(bs *bc.BlockStore) { blockStore = bs } diff --git a/state/state_test.go b/state/state_test.go index 71efd80f2..0d2c963bb 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -2,7 +2,6 @@ package state import ( "github.com/tendermint/tendermint/account" - "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/types" diff --git a/types/block_meta.go b/types/block_meta.go new file mode 100644 index 000000000..3e9ba8f91 --- /dev/null +++ b/types/block_meta.go @@ -0,0 +1,15 @@ +package types + +type BlockMeta struct { + Hash []byte // The block hash + Header *Header // The block's Header + Parts PartSetHeader // The PartSetHeader, for transfer +} + +func NewBlockMeta(block *Block, blockParts *PartSet) *BlockMeta { + return &BlockMeta{ + Hash: block.Hash(), + Header: block.Header, + Parts: blockParts.Header(), + } +} From aed4bbf0f0e236b28b76d9d456e91cb01de60103 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 02:36:59 -0700 Subject: [PATCH 06/15] Fix switch tests --- daemon/daemon.go | 4 +-- p2p/pex_reactor.go | 5 ++-- p2p/switch.go | 63 +++++++++++++++++++--------------------------- p2p/switch_test.go | 52 ++++++++++++++++++-------------------- 4 files changed, 56 insertions(+), 68 deletions(-) diff --git a/daemon/daemon.go b/daemon/daemon.go index 89e5de697..a4b573bee 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -94,13 +94,13 @@ func (n *Node) Start() { go n.inboundConnectionRoutine(l) } n.book.Start() - n.sw.StartAll() + n.sw.StartReactors() } func (n *Node) Stop() { log.Info("Stopping Node") // TODO: gracefully disconnect from peers. - n.sw.StopAll() + n.sw.Stop() n.book.Stop() } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index de742645f..926859316 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -62,8 +62,9 @@ func (pexR *PEXReactor) Stop() { func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ &ChannelDescriptor{ - Id: PexChannel, - Priority: 1, + Id: PexChannel, + Priority: 1, + SendQueueCapacity: 10, }, } } diff --git a/p2p/switch.go b/p2p/switch.go index 27c61cb46..5216f77dd 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "net" - "sync/atomic" "time" . "github.com/tendermint/tendermint/common" @@ -35,13 +34,11 @@ type Switch struct { reactorsByCh map[byte]Reactor peers *PeerSet dialing *CMap - listeners *CMap // listenerName -> chan interface{} - running uint32 // atomic + listeners *CMap // listenerName -> chan interface{} } var ( ErrSwitchDuplicatePeer = errors.New("Duplicate peer") - ErrSwitchStopped = errors.New("Switch stopped") ) const ( @@ -58,17 +55,18 @@ func NewSwitch() *Switch { peers: NewPeerSet(), dialing: NewCMap(), listeners: NewCMap(), - running: 0, } return sw } +// Not goroutine safe. func (sw *Switch) SetChainId(hash []byte, network string) { sw.chainId = hex.EncodeToString(hash) + "-" + network } -func (sw *Switch) AddReactor(name string, reactor Reactor) { +// Not goroutine safe. +func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { // Validate the reactor. // No two reactors can share the same channel. reactorChannels := reactor.GetChannels() @@ -81,51 +79,49 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) { sw.reactorsByCh[chId] = reactor } sw.reactors[name] = reactor - time.Sleep(1 * time.Second) + return reactor } -func (sw *Switch) StartReactor(name string) { - atomic.StoreUint32(&sw.running, 1) - sw.reactors[name].Start(sw) +func (sw *Switch) Reactor(name string) Reactor { + return sw.reactors[name] } // Convenience function -func (sw *Switch) StartAll() { - atomic.StoreUint32(&sw.running, 1) +func (sw *Switch) StartReactors() { for _, reactor := range sw.reactors { reactor.Start(sw) } } -func (sw *Switch) StopReactor(name string) { - sw.reactors[name].Stop() -} - // Convenience function -// Not goroutine safe -func (sw *Switch) StopAll() { - atomic.StoreUint32(&sw.running, 0) - // Stop each peer. - for _, peer := range sw.peers.List() { - peer.stop() - } - sw.peers = NewPeerSet() +func (sw *Switch) StopReactors() { // Stop all reactors. for _, reactor := range sw.reactors { reactor.Stop() } } -// Not goroutine safe +// Convenience function +func (sw *Switch) StopPeers() { + // Stop each peer. + for _, peer := range sw.peers.List() { + peer.stop() + } + sw.peers = NewPeerSet() +} + +// Convenience function +func (sw *Switch) Stop() { + sw.StopPeers() + sw.StopReactors() +} + +// Not goroutine safe to modify. func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - if atomic.LoadUint32(&sw.running) == 0 { - return nil, ErrSwitchStopped - } - peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) // Add the peer to .peers @@ -150,10 +146,6 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er } func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { - if atomic.LoadUint32(&sw.running) == 0 { - return nil, ErrSwitchStopped - } - log.Debug("Dialing address", "address", addr) sw.dialing.Set(addr.String(), addr) conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second) @@ -179,9 +171,6 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { // trying to send for defaultSendTimeoutSeconds. Returns a channel // which receives success values for each attempted send (false if times out) func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool { - if atomic.LoadUint32(&sw.running) == 0 { - return nil - } successChan := make(chan bool, len(sw.peers.List())) log.Debug("Broadcast", "channel", chId, "msg", msg) for _, peer := range sw.peers.List() { @@ -239,7 +228,7 @@ func (sw *Switch) IsListening() bool { } func (sw *Switch) doAddPeer(peer *Peer) { - for name, reactor := range sw.reactors { + for _, reactor := range sw.reactors { reactor.AddPeer(peer) } } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index a260df27a..f486230f5 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -68,12 +68,12 @@ func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) { //----------------------------------------------------------------------------- -// convenience method for creating two switches connected to each other. -func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, *Switch) { +// convenience method for creating bar switches connected to each other. +func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *Switch) { - // Create two switches that will be interconnected. - s1 := NewSwitch(reactorsGenerator()) - s2 := NewSwitch(reactorsGenerator()) + // Create bar switches that will be interconnected. + s1 := initSwitch(NewSwitch()) + s2 := initSwitch(NewSwitch()) // Create a listener for s1 l := NewDefaultListener("tcp", ":8001", true) @@ -104,18 +104,17 @@ func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, } func TestSwitches(t *testing.T) { - s1, s2 := makeSwitchPair(t, func() []Reactor { - // Make two reactors of two channels each - reactors := make([]Reactor, 2) - reactors[0] = NewTestReactor([]*ChannelDescriptor{ + s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch { + // Make bar reactors of bar channels each + sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x00), Priority: 10}, &ChannelDescriptor{Id: byte(0x01), Priority: 10}, - }, true) - reactors[1] = NewTestReactor([]*ChannelDescriptor{ + }, true)).Start(sw) // Start the reactor + sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x02), Priority: 10}, &ChannelDescriptor{Id: byte(0x03), Priority: 10}, - }, true) - return reactors + }, true)).Start(sw) // Start the reactor + return sw }) defer s1.Stop() defer s2.Stop() @@ -129,8 +128,8 @@ func TestSwitches(t *testing.T) { } ch0Msg := "channel zero" - ch1Msg := "channel one" - ch2Msg := "channel two" + ch1Msg := "channel foo" + ch2Msg := "channel bar" s1.Broadcast(byte(0x00), ch0Msg) s1.Broadcast(byte(0x01), ch1Msg) @@ -140,7 +139,7 @@ func TestSwitches(t *testing.T) { time.Sleep(5000 * time.Millisecond) // Check message on ch0 - ch0Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x00)] + ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)] if len(ch0Msgs) != 2 { t.Errorf("Expected to have received 1 message in ch0") } @@ -149,7 +148,7 @@ func TestSwitches(t *testing.T) { } // Check message on ch1 - ch1Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x01)] + ch1Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x01)] if len(ch1Msgs) != 1 { t.Errorf("Expected to have received 1 message in ch1") } @@ -158,7 +157,7 @@ func TestSwitches(t *testing.T) { } // Check message on ch2 - ch2Msgs := s2.Reactors()[1].(*TestReactor).msgsReceived[byte(0x02)] + ch2Msgs := s2.Reactor("bar").(*TestReactor).msgsReceived[byte(0x02)] if len(ch2Msgs) != 1 { t.Errorf("Expected to have received 1 message in ch2") } @@ -172,18 +171,17 @@ func BenchmarkSwitches(b *testing.B) { b.StopTimer() - s1, s2 := makeSwitchPair(b, func() []Reactor { - // Make two reactors of two channels each - reactors := make([]Reactor, 2) - reactors[0] = NewTestReactor([]*ChannelDescriptor{ + s1, s2 := makeSwitchPair(b, func(sw *Switch) *Switch { + // Make bar reactors of bar channels each + sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x00), Priority: 10}, &ChannelDescriptor{Id: byte(0x01), Priority: 10}, - }, false) - reactors[1] = NewTestReactor([]*ChannelDescriptor{ + }, false)) + sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x02), Priority: 10}, &ChannelDescriptor{Id: byte(0x03), Priority: 10}, - }, false) - return reactors + }, false)) + return sw }) defer s1.Stop() defer s2.Stop() @@ -194,7 +192,7 @@ func BenchmarkSwitches(b *testing.B) { numSuccess, numFailure := 0, 0 - // Send random message from one channel to another + // Send random message from foo channel to another for i := 0; i < b.N; i++ { chId := byte(i % 4) successChan := s1.Broadcast(chId, "test data") From cebfae60c7d1c30f1a3764eccb27b93fd9c4fa97 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 11:33:39 -0700 Subject: [PATCH 07/15] BlockchainReactor syncs first before ConsensusReactor. --- blockchain/pool.go | 214 ++++++++++++++++++++--------------------- blockchain/reactor.go | 60 +++++++++++- consensus/reactor.go | 2 + consensus/state.go | 2 + consensus/vote_set.go | 2 +- daemon/daemon.go | 10 +- p2p/peer_set.go | 7 +- state/validator_set.go | 47 +++++++++ types/block.go | 4 +- 9 files changed, 232 insertions(+), 116 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 099594c12..192165bf7 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -60,32 +60,32 @@ func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- } } -func (bp *BlockPool) Start() { - if atomic.CompareAndSwapInt32(&bp.running, 0, 1) { +func (pool *BlockPool) Start() { + if atomic.CompareAndSwapInt32(&pool.running, 0, 1) { log.Info("Starting BlockPool") - go bp.run() + go pool.run() } } -func (bp *BlockPool) Stop() { - if atomic.CompareAndSwapInt32(&bp.running, 1, 0) { +func (pool *BlockPool) Stop() { + if atomic.CompareAndSwapInt32(&pool.running, 1, 0) { log.Info("Stopping BlockPool") - bp.repeater.Stop() + pool.repeater.Stop() } } -func (bp *BlockPool) IsRunning() bool { - return atomic.LoadInt32(&bp.running) == 1 +func (pool *BlockPool) IsRunning() bool { + return atomic.LoadInt32(&pool.running) == 1 } // Run spawns requests as needed. -func (bp *BlockPool) run() { +func (pool *BlockPool) run() { RUN_LOOP: for { - if atomic.LoadInt32(&bp.running) == 0 { + if atomic.LoadInt32(&pool.running) == 0 { break RUN_LOOP } - height, numPending, numTotal := bp.GetStatus() + height, numPending, numTotal := pool.GetStatus() log.Debug("BlockPool.run", "height", height, "numPending", numPending, "numTotal", numTotal) if numPending >= maxPendingRequests { @@ -96,91 +96,91 @@ RUN_LOOP: time.Sleep(requestIntervalMS * time.Millisecond) } else { // request for more blocks. - height := bp.nextHeight() - bp.makeRequest(height) + height := pool.nextHeight() + pool.makeRequest(height) } } } -func (bp *BlockPool) GetStatus() (uint, int32, int32) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) GetStatus() (uint, int32, int32) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - return bp.height, bp.numPending, bp.numTotal + return pool.height, pool.numPending, pool.numTotal } // We need to see the second block's Validation to validate the first block. // So we peek two blocks at a time. -func (bp *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - if r := bp.requests[bp.height]; r != nil { + if r := pool.requests[pool.height]; r != nil { first = r.block } - if r := bp.requests[bp.height+1]; r != nil { + if r := pool.requests[pool.height+1]; r != nil { second = r.block } return } -// Pop the first block at bp.height +// Pop the first block at pool.height // It must have been validated by 'second'.Validation from PeekTwoBlocks(). -func (bp *BlockPool) PopRequest() { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) PopRequest() { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - if r := bp.requests[bp.height]; r == nil || r.block == nil { + if r := pool.requests[pool.height]; r == nil || r.block == nil { panic("PopRequest() requires a valid block") } - delete(bp.requests, bp.height) - bp.height++ - bp.numTotal-- + delete(pool.requests, pool.height) + pool.height++ + pool.numTotal-- } -// Invalidates the block at bp.height. +// Invalidates the block at pool.height. // Remove the peer and request from others. -func (bp *BlockPool) RedoRequest(height uint) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) RedoRequest(height uint) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - request := bp.requests[height] + request := pool.requests[height] if request.block == nil { panic("Expected block to be non-nil") } - bp.RemovePeer(request.peerId) // Lock on peersMtx. + pool.RemovePeer(request.peerId) // Lock on peersMtx. request.block = nil request.peerId = "" - bp.numPending++ + pool.numPending++ - go requestRoutine(bp, height) + go requestRoutine(pool, height) } -func (bp *BlockPool) hasBlock(height uint) bool { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) hasBlock(height uint) bool { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - request := bp.requests[height] + request := pool.requests[height] return request != nil && request.block != nil } -func (bp *BlockPool) setPeerForRequest(height uint, peerId string) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) setPeerForRequest(height uint, peerId string) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - request := bp.requests[height] + request := pool.requests[height] if request == nil { return } request.peerId = peerId } -func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - request := bp.requests[block.Height] + request := pool.requests[block.Height] if request == nil { return } @@ -191,23 +191,23 @@ func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { return } request.block = block - bp.numPending-- + pool.numPending-- } -func (bp *BlockPool) getPeer(peerId string) *bpPeer { - bp.peersMtx.Lock() // Lock - defer bp.peersMtx.Unlock() +func (pool *BlockPool) getPeer(peerId string) *bpPeer { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() - peer := bp.peers[peerId] + peer := pool.peers[peerId] return peer } // Sets the peer's blockchain height. -func (bp *BlockPool) SetPeerHeight(peerId string, height uint) { - bp.peersMtx.Lock() // Lock - defer bp.peersMtx.Unlock() +func (pool *BlockPool) SetPeerHeight(peerId string, height uint) { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() - peer := bp.peers[peerId] + peer := pool.peers[peerId] if peer != nil { peer.height = height } else { @@ -216,24 +216,24 @@ func (bp *BlockPool) SetPeerHeight(peerId string, height uint) { id: peerId, numRequests: 0, } - bp.peers[peerId] = peer + pool.peers[peerId] = peer } } -func (bp *BlockPool) RemovePeer(peerId string) { - bp.peersMtx.Lock() // Lock - defer bp.peersMtx.Unlock() +func (pool *BlockPool) RemovePeer(peerId string) { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() - delete(bp.peers, peerId) + delete(pool.peers, peerId) } // Pick an available peer with at least the given minHeight. // If no peers are available, returns nil. -func (bp *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { - bp.peersMtx.Lock() - defer bp.peersMtx.Unlock() +func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() - for _, peer := range bp.peers { + for _, peer := range pool.peers { if peer.numRequests >= maxRequestsPerPeer { continue } @@ -247,69 +247,69 @@ func (bp *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { return nil } -func (bp *BlockPool) decrPeer(peerId string) { - bp.peersMtx.Lock() - defer bp.peersMtx.Unlock() +func (pool *BlockPool) decrPeer(peerId string) { + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() - peer := bp.peers[peerId] + peer := pool.peers[peerId] if peer == nil { return } peer.numRequests-- } -func (bp *BlockPool) nextHeight() uint { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) nextHeight() uint { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() - return bp.height + uint(bp.numTotal) + return pool.height + uint(pool.numTotal) } -func (bp *BlockPool) makeRequest(height uint) { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) makeRequest(height uint) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() request := &bpRequest{ height: height, peerId: "", block: nil, } - bp.requests[height] = request + pool.requests[height] = request - nextHeight := bp.height + uint(bp.numTotal) + nextHeight := pool.height + uint(pool.numTotal) if nextHeight == height { - bp.numTotal++ - bp.numPending++ + pool.numTotal++ + pool.numPending++ } - go requestRoutine(bp, height) + go requestRoutine(pool, height) } -func (bp *BlockPool) sendRequest(height uint, peerId string) { - if atomic.LoadInt32(&bp.running) == 0 { +func (pool *BlockPool) sendRequest(height uint, peerId string) { + if atomic.LoadInt32(&pool.running) == 0 { return } - bp.requestsCh <- BlockRequest{height, peerId} + pool.requestsCh <- BlockRequest{height, peerId} } -func (bp *BlockPool) sendTimeout(peerId string) { - if atomic.LoadInt32(&bp.running) == 0 { +func (pool *BlockPool) sendTimeout(peerId string) { + if atomic.LoadInt32(&pool.running) == 0 { return } - bp.timeoutsCh <- peerId + pool.timeoutsCh <- peerId } -func (bp *BlockPool) debug() string { - bp.requestsMtx.Lock() // Lock - defer bp.requestsMtx.Unlock() +func (pool *BlockPool) debug() string { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() str := "" - for h := bp.height; h < bp.height+uint(bp.numTotal); h++ { - if bp.requests[h] == nil { + for h := pool.height; h < pool.height+uint(pool.numTotal); h++ { + if pool.requests[h] == nil { str += Fmt("H(%v):X ", h) } else { str += Fmt("H(%v):", h) - str += Fmt("B?(%v) ", bp.requests[h].block != nil) + str += Fmt("B?(%v) ", pool.requests[h].block != nil) } } return str @@ -333,15 +333,15 @@ type bpRequest struct { // Responsible for making more requests as necessary // Returns when a block is found (e.g. AddBlock() is called) -func requestRoutine(bp *BlockPool, height uint) { +func requestRoutine(pool *BlockPool, height uint) { for { var peer *bpPeer = nil PICK_LOOP: for { - if !bp.IsRunning() { + if !pool.IsRunning() { return } - peer = bp.pickIncrAvailablePeer(height) + peer = pool.pickIncrAvailablePeer(height) if peer == nil { time.Sleep(requestIntervalMS * time.Millisecond) continue PICK_LOOP @@ -349,24 +349,24 @@ func requestRoutine(bp *BlockPool, height uint) { break PICK_LOOP } - bp.setPeerForRequest(height, peer.id) + pool.setPeerForRequest(height, peer.id) for try := 0; try < maxTries; try++ { - bp.sendRequest(height, peer.id) + pool.sendRequest(height, peer.id) time.Sleep(requestTimeoutSeconds * time.Second) - if bp.hasBlock(height) { - bp.decrPeer(peer.id) + if pool.hasBlock(height) { + pool.decrPeer(peer.id) return } - bpHeight, _, _ := bp.GetStatus() + bpHeight, _, _ := pool.GetStatus() if height < bpHeight { - bp.decrPeer(peer.id) + pool.decrPeer(peer.id) return } } - bp.RemovePeer(peer.id) - bp.sendTimeout(peer.id) + pool.RemovePeer(peer.id) + pool.sendTimeout(peer.id) } } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 1aba782ee..e5976877e 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -8,7 +8,9 @@ import ( "time" "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -16,11 +18,17 @@ const ( BlockchainChannel = byte(0x40) defaultChannelCapacity = 100 defaultSleepIntervalMS = 500 + trySyncIntervalMS = 100 + + // stop syncing when last block's time is + // within this much of the system time. + stopSyncingDurationMinutes = 10 ) // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { sw *p2p.Switch + state *sm.State store *BlockStore pool *BlockPool requestsCh chan BlockRequest @@ -31,7 +39,10 @@ type BlockchainReactor struct { stopped uint32 } -func NewBlockchainReactor(store *BlockStore) *BlockchainReactor { +func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor { + if state.LastBlockHeight != store.Height() { + panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) + } requestsCh := make(chan BlockRequest, defaultChannelCapacity) timeoutsCh := make(chan string, defaultChannelCapacity) pool := NewBlockPool( @@ -40,6 +51,7 @@ func NewBlockchainReactor(store *BlockStore) *BlockchainReactor { timeoutsCh, ) bcR := &BlockchainReactor{ + state: state, store: store, pool: pool, requestsCh: requestsCh, @@ -129,7 +141,11 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) } } +// Handle messages from the poolReactor telling the reactor what to do. func (bcR *BlockchainReactor) poolRoutine() { + + trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + FOR_LOOP: for { select { @@ -150,6 +166,48 @@ FOR_LOOP: // Peer timed out. peer := bcR.sw.Peers().Get(peerId) bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) + case _ = <-trySyncTicker.C: // chan time + var lastValidatedBlock *types.Block + SYNC_LOOP: + for i := 0; i < 10; i++ { + // See if there are any blocks to sync. + first, second := bcR.pool.PeekTwoBlocks() + if first == nil || second == nil { + // We need both to sync the first block. + break SYNC_LOOP + } + firstParts := first.MakePartSet().Header() + // Finally, verify the first block using the second's validation. + err := bcR.state.BondedValidators.VerifyValidation( + first.Hash(), firstParts, first.Height, second.Validation) + if err != nil { + bcR.pool.RedoRequest(first.Height) + break SYNC_LOOP + } else { + bcR.pool.PopRequest() + err := bcR.state.AppendBlock(first, firstParts) + if err != nil { + // TODO This is bad, are we zombie? + panic(Fmt("Failed to process committed block: %v", err)) + } + lastValidatedBlock = first + } + } + // We're done syncing for now (will do again shortly) + // See if we want to stop syncing and turn on the + // consensus reactor. + // TODO: use other heuristics too besides blocktime. + // It's not a security concern, as it only needs to happen + // upon node sync, and there's also a second (slower) + // method of syncing in the consensus reactor. + if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { + go func() { + bcR.sw.Reactor("BLOCKCHAIN").Stop() + bcR.sw.Reactor("CONSENSUS").Start(bcR.sw) + }() + break FOR_LOOP + } + continue FOR_LOOP case <-bcR.quit: break FOR_LOOP } diff --git a/consensus/reactor.go b/consensus/reactor.go index 7abd75ec1..bbc39c89c 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -29,6 +29,8 @@ const ( //----------------------------------------------------------------------------- +// The reactor's underlying ConsensusState may change state at any time. +// We atomically copy the RoundState struct before using it. type ConsensusReactor struct { sw *p2p.Switch started uint32 diff --git a/consensus/state.go b/consensus/state.go index ad8b79cf7..e2738ca98 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -467,6 +467,8 @@ func (cs *ConsensusState) updateToState(state *sm.State) { // Reset fields based on state. validators := state.BondedValidators height := state.LastBlockHeight + 1 // next desired block height + + // RoundState fields cs.Height = height cs.Round = 0 cs.Step = RoundStepNewHeight diff --git a/consensus/vote_set.go b/consensus/vote_set.go index c640fc51b..973d4b7a1 100644 --- a/consensus/vote_set.go +++ b/consensus/vote_set.go @@ -34,7 +34,7 @@ type VoteSet struct { maj23Exists bool } -// Constructs a new VoteSet struct used to accumulate votes for each round. +// Constructs a new VoteSet struct used to accumulate votes for given height/round. func NewVoteSet(height uint, round uint, type_ byte, valSet *sm.ValidatorSet) *VoteSet { if height == 0 { panic("Cannot make VoteSet for height == 0, doesn't make sense.") diff --git a/daemon/daemon.go b/daemon/daemon.go index a4b573bee..fae9c9792 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -55,7 +55,7 @@ func NewNode() *Node { pexReactor := p2p.NewPEXReactor(book) // Get BlockchainReactor - bcReactor := bc.NewBlockchainReactor(blockStore) + bcReactor := bc.NewBlockchainReactor(state, blockStore) // Get MempoolReactor mempool := mempl.NewMempool(state.Copy()) @@ -70,10 +70,10 @@ func NewNode() *Node { sw := p2p.NewSwitch() sw.SetChainId(state.Hash(), config.App().GetString("Network")) - sw.AddReactor("PEX", pexReactor) - //sw.AddReactor("BLOCKCHAIN", bcReactor) - sw.AddReactor("MEMPOOL", mempoolReactor) - sw.AddReactor("CONSENSUS", consensusReactor) + sw.AddReactor("PEX", pexReactor).Start(sw) + sw.AddReactor("BLOCKCHAIN", bcReactor).Start(sw) + sw.AddReactor("MEMPOOL", mempoolReactor).Start(sw) + sw.AddReactor("CONSENSUS", consensusReactor) // Do not start yet. return &Node{ sw: sw, diff --git a/p2p/peer_set.go b/p2p/peer_set.go index 23f49c513..effad6dcc 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -59,7 +59,12 @@ func (ps *PeerSet) Has(peerKey string) bool { func (ps *PeerSet) Get(peerKey string) *Peer { ps.mtx.Lock() defer ps.mtx.Unlock() - return ps.lookup[peerKey].peer + item, ok := ps.lookup[peerKey] + if ok { + return item.peer + } else { + return nil + } } func (ps *PeerSet) Remove(peer *Peer) { diff --git a/state/validator_set.go b/state/validator_set.go index 50f76f423..09589a9db 100644 --- a/state/validator_set.go +++ b/state/validator_set.go @@ -2,12 +2,15 @@ package state import ( "bytes" + "errors" "fmt" "sort" "strings" + "github.com/tendermint/tendermint/account" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/merkle" + "github.com/tendermint/tendermint/types" ) // ValidatorSet represent a set of *Validator at a given height. @@ -198,6 +201,50 @@ func (valSet *ValidatorSet) Iterate(fn func(index uint, val *Validator) bool) { } } +// Verify that +2/3 of the set had signed the given signBytes +func (valSet *ValidatorSet) VerifyValidation(hash []byte, parts types.PartSetHeader, height uint, v *types.Validation) error { + if valSet.Size() != uint(len(v.Commits)) { + return errors.New(Fmt("Invalid validation -- wrong set size: %v vs %v", + valSet.Size(), len(v.Commits))) + } + + talliedVotingPower := uint64(0) + seenValidators := map[string]struct{}{} + + for idx, commit := range v.Commits { + // may be zero, in which case skip. + if commit.Signature.IsZero() { + continue + } + _, val := valSet.GetByIndex(uint(idx)) + commitSignBytes := account.SignBytes(&types.Vote{ + Height: height, Round: commit.Round, Type: types.VoteTypeCommit, + BlockHash: hash, + BlockParts: parts, + }) + + // Validate + if _, seen := seenValidators[string(val.Address)]; seen { + return Errorf("Duplicate validator for commit %v for Validation %v", commit, v) + } + + if !val.PubKey.VerifyBytes(commitSignBytes, commit.Signature) { + return Errorf("Invalid signature for commit %v for Validation %v", commit, v) + } + + // Tally + seenValidators[string(val.Address)] = struct{}{} + talliedVotingPower += val.VotingPower + } + + if talliedVotingPower > valSet.TotalVotingPower()*2/3 { + return nil + } else { + return Errorf("insufficient voting power %v, needed %v", + talliedVotingPower, (valSet.TotalVotingPower()*2/3 + 1)) + } +} + func (valSet *ValidatorSet) String() string { return valSet.StringIndented("") } diff --git a/types/block.go b/types/block.go index 11dfb3b9d..176ce4385 100644 --- a/types/block.go +++ b/types/block.go @@ -39,7 +39,9 @@ func (b *Block) ValidateBasic(lastBlockHeight uint, lastBlockHash []byte, if !b.LastBlockParts.Equals(lastBlockParts) { return errors.New("Wrong Block.Header.LastBlockParts") } - /* TODO: Determine bounds. + /* TODO: Determine bounds + See blockchain/reactor "stopSyncingDurationMinutes" + if !b.Time.After(lastBlockTime) { return errors.New("Invalid Block.Header.Time") } From 94c3a5176082ed8e7590732f7b089ec8d9b6c4c5 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 12:13:32 -0700 Subject: [PATCH 08/15] bug fix in daemon -- network name only, no chain hash --- daemon/daemon.go | 2 +- p2p/pex_reactor.go | 8 ++++---- p2p/switch.go | 11 +++++------ 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/daemon/daemon.go b/daemon/daemon.go index fae9c9792..ddc310762 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -69,7 +69,7 @@ func NewNode() *Node { } sw := p2p.NewSwitch() - sw.SetChainId(state.Hash(), config.App().GetString("Network")) + sw.SetNetwork(config.App().GetString("Network")) sw.AddReactor("PEX", pexReactor).Start(sw) sw.AddReactor("BLOCKCHAIN", bcReactor).Start(sw) sw.AddReactor("MEMPOOL", mempoolReactor).Start(sw) diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 926859316..439b4cf9b 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -98,9 +98,9 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { switch msg.(type) { case *pexHandshakeMessage: - chainId := msg.(*pexHandshakeMessage).ChainId - if chainId != pexR.sw.chainId { - err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", chainId, pexR.sw.chainId) + network := msg.(*pexHandshakeMessage).Network + if network != pexR.sw.network { + err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", network, pexR.sw.network) pexR.sw.StopPeerForError(src, err) } case *pexRequestMessage: @@ -238,7 +238,7 @@ func DecodeMessage(bz []byte) (msg interface{}, err error) { A pexHandshakeMessage contains the peer's chainId */ type pexHandshakeMessage struct { - ChainId string + Network string } func (m *pexHandshakeMessage) TypeByte() byte { return msgTypeHandshake } diff --git a/p2p/switch.go b/p2p/switch.go index 5216f77dd..10da29f36 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -1,7 +1,6 @@ package p2p import ( - "encoding/hex" "errors" "fmt" "net" @@ -28,7 +27,7 @@ or more `Channels`. So while sending outgoing messages is typically performed o incoming messages are received on the reactor. */ type Switch struct { - chainId string + network string reactors map[string]Reactor chDescs []*ChannelDescriptor reactorsByCh map[byte]Reactor @@ -48,7 +47,7 @@ const ( func NewSwitch() *Switch { sw := &Switch{ - chainId: "", + network: "", reactors: make(map[string]Reactor), chDescs: make([]*ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), @@ -61,8 +60,8 @@ func NewSwitch() *Switch { } // Not goroutine safe. -func (sw *Switch) SetChainId(hash []byte, network string) { - sw.chainId = hex.EncodeToString(hash) + "-" + network +func (sw *Switch) SetNetwork(network string) { + sw.network = network } // Not goroutine safe. @@ -139,7 +138,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er go peer.start() // Send handshake - msg := &pexHandshakeMessage{ChainId: sw.chainId} + msg := &pexHandshakeMessage{Network: sw.network} peer.Send(PexChannel, msg) return peer, nil From 938eda979bf71f36542453f49a9d9d69f1d76bc5 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 12:21:52 -0700 Subject: [PATCH 09/15] fix bug, peer may be nil --- blockchain/reactor.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index e5976877e..f80abbe34 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -112,11 +112,10 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) log.Warn("Error decoding message", "error", err) return } - log.Info("BlockchainReactor received message", "msg", msg_) + log.Debug("BlockchainReactor received message", "msg", msg_) switch msg := msg_.(type) { case BlockRequestMessage: - log.Debug("Got BlockRequest", "msg", msg) // Got a request for a block. Respond with block if we have it. block := bcR.store.LoadBlock(msg.Height) if block != nil { @@ -129,11 +128,9 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) // TODO peer is asking for things we don't have. } case BlockResponseMessage: - log.Debug("Got BlockResponse", "msg", msg) // Got a block. bcR.pool.AddBlock(msg.Block, src.Key) case PeerStatusMessage: - log.Debug("Got PeerStatus", "msg", msg) // Got a peer status. bcR.pool.SetPeerHeight(src.Key, msg.Height) default: @@ -165,7 +162,9 @@ FOR_LOOP: case peerId := <-bcR.timeoutsCh: // chan string // Peer timed out. peer := bcR.sw.Peers().Get(peerId) - bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) + if peer != nil { + bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) + } case _ = <-trySyncTicker.C: // chan time var lastValidatedBlock *types.Block SYNC_LOOP: From 788f9bfb933b3f3882d7d25075e973da2f8daa30 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 13:01:28 -0700 Subject: [PATCH 10/15] started/stopped -> running; contiguous vs fast forward ConsensusState updates. --- blockchain/reactor.go | 20 ++++++++++++-------- consensus/reactor.go | 30 ++++++++++++++++++++++-------- consensus/state.go | 8 ++++---- daemon/daemon.go | 2 +- p2p/peer.go | 19 +++++++++---------- 5 files changed, 48 insertions(+), 31 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index f80abbe34..56ae6a241 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -35,8 +35,7 @@ type BlockchainReactor struct { timeoutsCh chan string lastBlock *types.Block quit chan struct{} - started uint32 - stopped uint32 + running uint32 } func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor { @@ -57,15 +56,14 @@ func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor requestsCh: requestsCh, timeoutsCh: timeoutsCh, quit: make(chan struct{}), - started: 0, - stopped: 0, + running: uint32(0), } return bcR } // Implements Reactor func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { - if atomic.CompareAndSwapUint32(&bcR.started, 0, 1) { + if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) { log.Info("Starting BlockchainReactor") bcR.sw = sw bcR.pool.Start() @@ -75,7 +73,7 @@ func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { // Implements Reactor func (bcR *BlockchainReactor) Stop() { - if atomic.CompareAndSwapUint32(&bcR.stopped, 0, 1) { + if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) { log.Info("Stopping BlockchainReactor") close(bcR.quit) bcR.pool.Stop() @@ -201,8 +199,14 @@ FOR_LOOP: // method of syncing in the consensus reactor. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { go func() { - bcR.sw.Reactor("BLOCKCHAIN").Stop() - bcR.sw.Reactor("CONSENSUS").Start(bcR.sw) + log.Info("Stopping blockpool syncing, turning on consensus...") + //bcR.sw.Reactor("BLOCKCHAIN").Stop() + trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others. + conR := bcR.sw.Reactor("CONSENSUS") + conR.Start(bcR.sw) + for _, peer := range bcR.sw.Peers().List() { + conR.AddPeer(peer) + } }() break FOR_LOOP } diff --git a/consensus/reactor.go b/consensus/reactor.go index bbc39c89c..30ebde879 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -33,8 +33,7 @@ const ( // We atomically copy the RoundState struct before using it. type ConsensusReactor struct { sw *p2p.Switch - started uint32 - stopped uint32 + running uint32 quit chan struct{} blockStore *bc.BlockStore @@ -52,7 +51,7 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto // Implements Reactor func (conR *ConsensusReactor) Start(sw *p2p.Switch) { - if atomic.CompareAndSwapUint32(&conR.started, 0, 1) { + if atomic.CompareAndSwapUint32(&conR.running, 0, 1) { log.Info("Starting ConsensusReactor") conR.sw = sw conR.conS.Start() @@ -62,15 +61,15 @@ func (conR *ConsensusReactor) Start(sw *p2p.Switch) { // Implements Reactor func (conR *ConsensusReactor) Stop() { - if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) { + if atomic.CompareAndSwapUint32(&conR.running, 1, 0) { log.Info("Stopping ConsensusReactor") conR.conS.Stop() close(conR.quit) } } -func (conR *ConsensusReactor) IsStopped() bool { - return atomic.LoadUint32(&conR.stopped) == 1 +func (conR *ConsensusReactor) IsRunning() bool { + return atomic.LoadUint32(&conR.running) == 0 } // Implements Reactor @@ -94,6 +93,10 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { + if !conR.IsRunning() { + return + } + // Create peerState for peer peerState := NewPeerState(peer) peer.Data.Set(peerStateKey, peerState) @@ -108,11 +111,18 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { // Implements Reactor func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + if !conR.IsRunning() { + return + } + //peer.Data.Get(peerStateKey).(*PeerState).Disconnect() } // Implements Reactor func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { + if !conR.IsRunning() { + return + } // Get round state rs := conR.conS.GetRoundState() @@ -215,6 +225,10 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) } +func (conR *ConsensusReactor) UpdateToState(state *sm.State) { + conR.conS.updateToState(state, false) +} + //-------------------------------------- func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { @@ -279,7 +293,7 @@ func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) { OUTER_LOOP: for { // Manage disconnects from self or peer. - if peer.IsStopped() || conR.IsStopped() { + if !peer.IsRunning() || !conR.IsRunning() { log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer)) return } @@ -382,7 +396,7 @@ func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) OUTER_LOOP: for { // Manage disconnects from self or peer. - if peer.IsStopped() || conR.IsStopped() { + if !peer.IsRunning() || !conR.IsRunning() { log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer)) return } diff --git a/consensus/state.go b/consensus/state.go index e2738ca98..683612496 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -256,7 +256,7 @@ func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReacto runActionCh: make(chan RoundAction, 1), newStepCh: make(chan *RoundState, 1), } - cs.updateToState(state) + cs.updateToState(state, true) return cs } @@ -457,9 +457,9 @@ ACTION_LOOP: // If calculated round is greater than 0 (based on BlockTime or calculated StartTime) // then also sets up the appropriate round, and cs.Step becomes RoundStepNewRound. // Otherwise the round is 0 and cs.Step becomes RoundStepNewHeight. -func (cs *ConsensusState) updateToState(state *sm.State) { +func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) { // Sanity check state. - if cs.Height > 0 && cs.Height != state.LastBlockHeight { + if contiguous && cs.Height > 0 && cs.Height != state.LastBlockHeight { panic(Fmt("updateToState() expected state height of %v but found %v", cs.Height, state.LastBlockHeight)) } @@ -859,7 +859,7 @@ func (cs *ConsensusState) TryFinalizeCommit(height uint) bool { // We have the block, so save/stage/sign-commit-vote. cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Commits) // Increment height. - cs.updateToState(cs.stagedState) + cs.updateToState(cs.stagedState, true) // cs.Step is now RoundStepNewHeight or RoundStepNewRound cs.newStepCh <- cs.getRoundState() return true diff --git a/daemon/daemon.go b/daemon/daemon.go index ddc310762..1b029f9d7 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -94,7 +94,7 @@ func (n *Node) Start() { go n.inboundConnectionRoutine(l) } n.book.Start() - n.sw.StartReactors() + //n.sw.StartReactors() } func (n *Node) Stop() { diff --git a/p2p/peer.go b/p2p/peer.go index 68137a63a..173297eb0 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -13,8 +13,7 @@ import ( type Peer struct { outbound bool mconn *MConnection - started uint32 - stopped uint32 + running uint32 Key string Data *CMap // User data. @@ -37,7 +36,7 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc p = &Peer{ outbound: outbound, mconn: mconn, - stopped: 0, + running: 0, Key: mconn.RemoteAddress.String(), Data: NewCMap(), } @@ -45,21 +44,21 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc } func (p *Peer) start() { - if atomic.CompareAndSwapUint32(&p.started, 0, 1) { + if atomic.CompareAndSwapUint32(&p.running, 0, 1) { log.Debug("Starting Peer", "peer", p) p.mconn.Start() } } func (p *Peer) stop() { - if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { + if atomic.CompareAndSwapUint32(&p.running, 1, 0) { log.Debug("Stopping Peer", "peer", p) p.mconn.Stop() } } -func (p *Peer) IsStopped() bool { - return atomic.LoadUint32(&p.stopped) == 1 +func (p *Peer) IsRunning() bool { + return atomic.LoadUint32(&p.running) == 1 } func (p *Peer) Connection() *MConnection { @@ -71,21 +70,21 @@ func (p *Peer) IsOutbound() bool { } func (p *Peer) Send(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&p.stopped) == 1 { + if atomic.LoadUint32(&p.running) == 0 { return false } return p.mconn.Send(chId, msg) } func (p *Peer) TrySend(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&p.stopped) == 1 { + if atomic.LoadUint32(&p.running) == 0 { return false } return p.mconn.TrySend(chId, msg) } func (p *Peer) CanSend(chId byte) bool { - if atomic.LoadUint32(&p.stopped) == 1 { + if atomic.LoadUint32(&p.running) == 0 { return false } return p.mconn.CanSend(chId) From a2b8318aaccc343b41da9a4b140135dcd25c2a03 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 13:17:45 -0700 Subject: [PATCH 11/15] allow BlockchainReactor to reset ConsensusReactor state --- blockchain/pool.go | 3 +++ blockchain/reactor.go | 6 ++++++ consensus/reactor.go | 1 + 3 files changed, 10 insertions(+) diff --git a/blockchain/pool.go b/blockchain/pool.go index 192165bf7..577701519 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -339,16 +339,19 @@ func requestRoutine(pool *BlockPool, height uint) { PICK_LOOP: for { if !pool.IsRunning() { + log.Debug("BlockPool not running. Stopping requestRoutine", "height", height) return } peer = pool.pickIncrAvailablePeer(height) if peer == nil { + log.Debug("No peers available", "height", height) time.Sleep(requestIntervalMS * time.Millisecond) continue PICK_LOOP } break PICK_LOOP } + log.Debug("Selected peer for request", "height", height, "peerId", peer.id) pool.setPeerForRequest(height, peer.id) for try := 0; try < maxTries; try++ { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 56ae6a241..a3e93e989 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -25,6 +25,10 @@ const ( stopSyncingDurationMinutes = 10 ) +type stateResetter interface { + ResetToState(*sm.State) +} + // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { sw *p2p.Switch @@ -93,6 +97,7 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { + log.Debug("BlockchainReactor AddPeer", "peer", peer) // Send peer our state. peer.Send(BlockchainChannel, PeerStatusMessage{bcR.store.Height()}) } @@ -203,6 +208,7 @@ FOR_LOOP: //bcR.sw.Reactor("BLOCKCHAIN").Stop() trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others. conR := bcR.sw.Reactor("CONSENSUS") + conR.(stateResetter).ResetToState(bcR.state) conR.Start(bcR.sw) for _, peer := range bcR.sw.Peers().List() { conR.AddPeer(peer) diff --git a/consensus/reactor.go b/consensus/reactor.go index 30ebde879..56cadcf9d 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -225,6 +225,7 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) } +// Fast forward to some state. func (conR *ConsensusReactor) UpdateToState(state *sm.State) { conR.conS.updateToState(state, false) } From bd6d9d646df967b6d94441837cf702b5ea1d7573 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 13:22:46 -0700 Subject: [PATCH 12/15] start peer before AddPeer() on reactors. --- consensus/reactor.go | 4 ++-- p2p/switch.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 56cadcf9d..8d992bba7 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -225,8 +225,8 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) } -// Fast forward to some state. -func (conR *ConsensusReactor) UpdateToState(state *sm.State) { +// Reset to some state. +func (conR *ConsensusReactor) ResetToState(state *sm.State) { conR.conS.updateToState(state, false) } diff --git a/p2p/switch.go b/p2p/switch.go index 10da29f36..105561314 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -131,12 +131,12 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er return nil, ErrSwitchDuplicatePeer } - // Notify listeners. - sw.doAddPeer(peer) - // Start the peer go peer.start() + // Notify listeners. + sw.doAddPeer(peer) + // Send handshake msg := &pexHandshakeMessage{Network: sw.network} peer.Send(PexChannel, msg) From 7171823fc6d667904cfdc926a51f2848a804a04d Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 17:16:49 -0700 Subject: [PATCH 13/15] Fix blockpool bugs and clean up log messages. --- blockchain/pool.go | 21 ++++++--------- blockchain/reactor.go | 61 +++++++++++++++++++++++-------------------- consensus/reactor.go | 2 +- p2p/addrbook.go | 2 +- p2p/connection.go | 1 + p2p/pex_reactor.go | 10 ++++--- p2p/switch.go | 2 +- 7 files changed, 51 insertions(+), 48 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 577701519..0bd620463 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -10,14 +10,12 @@ import ( ) const ( - maxOutstandingRequestsPerPeer = 10 - inputsChannelCapacity = 100 - maxTries = 3 - requestIntervalMS = 500 - requestBatchSize = 50 - maxPendingRequests = 50 - maxTotalRequests = 100 - maxRequestsPerPeer = 20 + maxTries = 3 + inputsChannelCapacity = 200 + requestIntervalMS = 500 + maxPendingRequests = 200 + maxTotalRequests = 300 + maxRequestsPerPeer = 300 ) var ( @@ -85,9 +83,7 @@ RUN_LOOP: if atomic.LoadInt32(&pool.running) == 0 { break RUN_LOOP } - height, numPending, numTotal := pool.GetStatus() - log.Debug("BlockPool.run", "height", height, "numPending", numPending, - "numTotal", numTotal) + _, numPending, numTotal := pool.GetStatus() if numPending >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) @@ -344,14 +340,13 @@ func requestRoutine(pool *BlockPool, height uint) { } peer = pool.pickIncrAvailablePeer(height) if peer == nil { - log.Debug("No peers available", "height", height) + //log.Debug("No peers available", "height", height) time.Sleep(requestIntervalMS * time.Millisecond) continue PICK_LOOP } break PICK_LOOP } - log.Debug("Selected peer for request", "height", height, "peerId", peer.id) pool.setPeerForRequest(height, peer.id) for try := 0; try < maxTries; try++ { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index a3e93e989..b0ae92455 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -90,16 +90,15 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { &p2p.ChannelDescriptor{ Id: BlockchainChannel, Priority: 5, - SendQueueCapacity: 20, // Queue 20 blocks to send to a peer. + SendQueueCapacity: 100, }, } } // Implements Reactor func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { - log.Debug("BlockchainReactor AddPeer", "peer", peer) // Send peer our state. - peer.Send(BlockchainChannel, PeerStatusMessage{bcR.store.Height()}) + peer.Send(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()}) } // Implements Reactor @@ -115,14 +114,15 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) log.Warn("Error decoding message", "error", err) return } - log.Debug("BlockchainReactor received message", "msg", msg_) + + log.Info("Received message", "msg", msg_) switch msg := msg_.(type) { - case BlockRequestMessage: + case bcBlockRequestMessage: // Got a request for a block. Respond with block if we have it. block := bcR.store.LoadBlock(msg.Height) if block != nil { - msg := BlockResponseMessage{Block: block} + msg := bcBlockResponseMessage{Block: block} queued := src.TrySend(BlockchainChannel, msg) if !queued { // queue is full, just ignore. @@ -130,10 +130,10 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) } else { // TODO peer is asking for things we don't have. } - case BlockResponseMessage: + case bcBlockResponseMessage: // Got a block. bcR.pool.AddBlock(msg.Block, src.Key) - case PeerStatusMessage: + case bcPeerStatusMessage: // Got a peer status. bcR.pool.SetPeerHeight(src.Key, msg.Height) default: @@ -155,7 +155,7 @@ FOR_LOOP: // We can't fulfill the request. continue FOR_LOOP } - msg := BlockRequestMessage{request.Height} + msg := bcBlockRequestMessage{request.Height} queued := peer.TrySend(BlockchainChannel, msg) if !queued { // We couldn't queue the request. @@ -174,24 +174,29 @@ FOR_LOOP: for i := 0; i < 10; i++ { // See if there are any blocks to sync. first, second := bcR.pool.PeekTwoBlocks() + //log.Debug("TrySync peeked", "first", first, "second", second) if first == nil || second == nil { // We need both to sync the first block. break SYNC_LOOP } - firstParts := first.MakePartSet().Header() + firstParts := first.MakePartSet() + firstPartsHeader := firstParts.Header() // Finally, verify the first block using the second's validation. err := bcR.state.BondedValidators.VerifyValidation( - first.Hash(), firstParts, first.Height, second.Validation) + first.Hash(), firstPartsHeader, first.Height, second.Validation) if err != nil { + log.Debug("error in validation", "error", err) bcR.pool.RedoRequest(first.Height) break SYNC_LOOP } else { bcR.pool.PopRequest() - err := bcR.state.AppendBlock(first, firstParts) + err := bcR.state.AppendBlock(first, firstPartsHeader) if err != nil { // TODO This is bad, are we zombie? panic(Fmt("Failed to process committed block: %v", err)) } + bcR.store.SaveBlock(first, firstParts, second.Validation) + bcR.state.Save() lastValidatedBlock = first } } @@ -224,7 +229,7 @@ FOR_LOOP: } func (bcR *BlockchainReactor) BroadcastStatus() error { - bcR.sw.Broadcast(BlockchainChannel, PeerStatusMessage{bcR.store.Height()}) + bcR.sw.Broadcast(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()}) return nil } @@ -245,11 +250,11 @@ func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) { r := bytes.NewReader(bz) switch msgType { case msgTypeBlockRequest: - msg = binary.ReadBinary(BlockRequestMessage{}, r, n, &err) + msg = binary.ReadBinary(bcBlockRequestMessage{}, r, n, &err) case msgTypeBlockResponse: - msg = binary.ReadBinary(BlockResponseMessage{}, r, n, &err) + msg = binary.ReadBinary(bcBlockResponseMessage{}, r, n, &err) case msgTypePeerStatus: - msg = binary.ReadBinary(PeerStatusMessage{}, r, n, &err) + msg = binary.ReadBinary(bcPeerStatusMessage{}, r, n, &err) default: msg = nil } @@ -258,36 +263,36 @@ func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) { //------------------------------------- -type BlockRequestMessage struct { +type bcBlockRequestMessage struct { Height uint } -func (m BlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest } +func (m bcBlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest } -func (m BlockRequestMessage) String() string { - return fmt.Sprintf("[BlockRequestMessage %v]", m.Height) +func (m bcBlockRequestMessage) String() string { + return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height) } //------------------------------------- -type BlockResponseMessage struct { +type bcBlockResponseMessage struct { Block *types.Block } -func (m BlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse } +func (m bcBlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse } -func (m BlockResponseMessage) String() string { - return fmt.Sprintf("[BlockResponseMessage %v]", m.Block.Height) +func (m bcBlockResponseMessage) String() string { + return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height) } //------------------------------------- -type PeerStatusMessage struct { +type bcPeerStatusMessage struct { Height uint } -func (m PeerStatusMessage) TypeByte() byte { return msgTypePeerStatus } +func (m bcPeerStatusMessage) TypeByte() byte { return msgTypePeerStatus } -func (m PeerStatusMessage) String() string { - return fmt.Sprintf("[PeerStatusMessage %v]", m.Height) +func (m bcPeerStatusMessage) String() string { + return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height) } diff --git a/consensus/reactor.go b/consensus/reactor.go index 8d992bba7..a3028b6d5 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -69,7 +69,7 @@ func (conR *ConsensusReactor) Stop() { } func (conR *ConsensusReactor) IsRunning() bool { - return atomic.LoadUint32(&conR.running) == 0 + return atomic.LoadUint32(&conR.running) == 1 } // Implements Reactor diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 43cad607c..893cf1c9e 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -381,7 +381,7 @@ out: for { select { case <-dumpAddressTicker.C: - log.Debug("Saving book to file", "size", a.Size()) + log.Debug("Saving AddrBook to file", "size", a.Size()) a.saveToFile(a.filePath) case <-a.quit: break out diff --git a/p2p/connection.go b/p2p/connection.go index 0e26480f1..c75538365 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -417,6 +417,7 @@ FOR_LOOP: } msgBytes := channel.recvMsgPacket(pkt) if msgBytes != nil { + log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes) c.onReceive(pkt.ChannelId, msgBytes) } default: diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 439b4cf9b..5ba238d31 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -176,10 +176,12 @@ func (pexR *PEXReactor) ensurePeers() { alreadyDialing := pexR.sw.IsDialing(try) alreadyConnected := pexR.sw.Peers().Has(try.String()) if alreadySelected || alreadyDialing || alreadyConnected { - log.Debug("Cannot dial address", "addr", try, - "alreadySelected", alreadySelected, - "alreadyDialing", alreadyDialing, - "alreadyConnected", alreadyConnected) + /* + log.Debug("Cannot dial address", "addr", try, + "alreadySelected", alreadySelected, + "alreadyDialing", alreadyDialing, + "alreadyConnected", alreadyConnected) + */ continue } else { log.Debug("Will dial address", "addr", try) diff --git a/p2p/switch.go b/p2p/switch.go index 105561314..1eb513089 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -132,7 +132,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er } // Start the peer - go peer.start() + peer.start() // Notify listeners. sw.doAddPeer(peer) From bd767c1fab2123aa94b637723f3fb278f295c08c Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 26 Mar 2015 00:35:16 -0700 Subject: [PATCH 14/15] Make fast_sync a command-line flag --- blockchain/reactor.go | 56 ++++++++++++++++++++++++------------------- config/config.go | 4 ++++ daemon/daemon.go | 10 +++++--- 3 files changed, 42 insertions(+), 28 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index b0ae92455..6d65708f2 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -35,6 +35,7 @@ type BlockchainReactor struct { state *sm.State store *BlockStore pool *BlockPool + sync bool requestsCh chan BlockRequest timeoutsCh chan string lastBlock *types.Block @@ -42,7 +43,7 @@ type BlockchainReactor struct { running uint32 } -func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor { +func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor { if state.LastBlockHeight != store.Height() { panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) } @@ -57,6 +58,7 @@ func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor state: state, store: store, pool: pool, + sync: sync, requestsCh: requestsCh, timeoutsCh: timeoutsCh, quit: make(chan struct{}), @@ -71,7 +73,9 @@ func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { log.Info("Starting BlockchainReactor") bcR.sw = sw bcR.pool.Start() - go bcR.poolRoutine() + if bcR.sync { + go bcR.poolRoutine() + } } } @@ -169,7 +173,7 @@ FOR_LOOP: bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) } case _ = <-trySyncTicker.C: // chan time - var lastValidatedBlock *types.Block + //var lastValidatedBlock *types.Block SYNC_LOOP: for i := 0; i < 10; i++ { // See if there are any blocks to sync. @@ -197,30 +201,32 @@ FOR_LOOP: } bcR.store.SaveBlock(first, firstParts, second.Validation) bcR.state.Save() - lastValidatedBlock = first + //lastValidatedBlock = first } } - // We're done syncing for now (will do again shortly) - // See if we want to stop syncing and turn on the - // consensus reactor. - // TODO: use other heuristics too besides blocktime. - // It's not a security concern, as it only needs to happen - // upon node sync, and there's also a second (slower) - // method of syncing in the consensus reactor. - if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { - go func() { - log.Info("Stopping blockpool syncing, turning on consensus...") - //bcR.sw.Reactor("BLOCKCHAIN").Stop() - trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others. - conR := bcR.sw.Reactor("CONSENSUS") - conR.(stateResetter).ResetToState(bcR.state) - conR.Start(bcR.sw) - for _, peer := range bcR.sw.Peers().List() { - conR.AddPeer(peer) - } - }() - break FOR_LOOP - } + /* + // We're done syncing for now (will do again shortly) + // See if we want to stop syncing and turn on the + // consensus reactor. + // TODO: use other heuristics too besides blocktime. + // It's not a security concern, as it only needs to happen + // upon node sync, and there's also a second (slower) + // method of syncing in the consensus reactor. + + if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { + go func() { + log.Info("Stopping blockpool syncing, turning on consensus...") + trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others. + conR := bcR.sw.Reactor("CONSENSUS") + conR.(stateResetter).ResetToState(bcR.state) + conR.Start(bcR.sw) + for _, peer := range bcR.sw.Peers().List() { + conR.AddPeer(peer) + } + }() + break FOR_LOOP + } + */ continue FOR_LOOP case <-bcR.quit: break FOR_LOOP diff --git a/config/config.go b/config/config.go index 0abb40d03..48973a162 100644 --- a/config/config.go +++ b/config/config.go @@ -104,6 +104,8 @@ func initDefaults(rootDir string) { app.SetDefault("GenesisFile", rootDir+"/genesis.json") app.SetDefault("AddrBookFile", rootDir+"/addrbook.json") app.SetDefault("PrivValidatorfile", rootDir+"/priv_validator.json") + + app.SetDefault("FastSync", false) } func Init(rootDir string) { @@ -161,6 +163,7 @@ func ParseFlags(args []string) { flags.BoolVar(&printHelp, "help", false, "Print this help message.") flags.String("listen_addr", app.GetString("ListenAddr"), "Listen address. (0.0.0.0:0 means any interface, any port)") flags.String("seed_node", app.GetString("SeedNode"), "Address of seed node") + flags.Bool("fast_sync", app.GetBool("FastSync"), "Fast blockchain syncing") flags.String("rpc_http_listen_addr", app.GetString("RPC.HTTP.ListenAddr"), "RPC listen address. Port required") flags.Parse(args) if printHelp { @@ -171,6 +174,7 @@ func ParseFlags(args []string) { // Merge parsed flag values onto app. app.BindPFlag("ListenAddr", flags.Lookup("listen_addr")) app.BindPFlag("SeedNode", flags.Lookup("seed_node")) + app.BindPFlag("FastSync", flags.Lookup("fast_sync")) app.BindPFlag("RPC.HTTP.ListenAddr", flags.Lookup("rpc_http_listen_addr")) // Confused? diff --git a/daemon/daemon.go b/daemon/daemon.go index 1b029f9d7..dc43e2fa1 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -55,7 +55,7 @@ func NewNode() *Node { pexReactor := p2p.NewPEXReactor(book) // Get BlockchainReactor - bcReactor := bc.NewBlockchainReactor(state, blockStore) + bcReactor := bc.NewBlockchainReactor(state, blockStore, config.App().GetBool("FastSync")) // Get MempoolReactor mempool := mempl.NewMempool(state.Copy()) @@ -71,9 +71,13 @@ func NewNode() *Node { sw := p2p.NewSwitch() sw.SetNetwork(config.App().GetString("Network")) sw.AddReactor("PEX", pexReactor).Start(sw) - sw.AddReactor("BLOCKCHAIN", bcReactor).Start(sw) sw.AddReactor("MEMPOOL", mempoolReactor).Start(sw) - sw.AddReactor("CONSENSUS", consensusReactor) // Do not start yet. + sw.AddReactor("BLOCKCHAIN", bcReactor).Start(sw) + if !config.App().GetBool("FastSync") { + sw.AddReactor("CONSENSUS", consensusReactor).Start(sw) + } else { + sw.AddReactor("CONSENSUS", consensusReactor) + } return &Node{ sw: sw, From af3c418ea9c49d05b7fad0992165acf209632f30 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 26 Mar 2015 00:52:07 -0700 Subject: [PATCH 15/15] comment fixes --- p2p/switch_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index f486230f5..ffdb9950c 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -68,10 +68,10 @@ func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) { //----------------------------------------------------------------------------- -// convenience method for creating bar switches connected to each other. +// convenience method for creating two switches connected to each other. func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *Switch) { - // Create bar switches that will be interconnected. + // Create two switches that will be interconnected. s1 := initSwitch(NewSwitch()) s2 := initSwitch(NewSwitch()) @@ -105,7 +105,7 @@ func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *S func TestSwitches(t *testing.T) { s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch { - // Make bar reactors of bar channels each + // Make two reactors of two channels each sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x00), Priority: 10}, &ChannelDescriptor{Id: byte(0x01), Priority: 10},