diff --git a/blockchain/pool.go b/blockchain/pool.go index 528c66548..0bd620463 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -1,7 +1,7 @@ package blockchain import ( - "math/rand" + "sync" "sync/atomic" "time" @@ -10,345 +10,367 @@ import ( ) const ( - maxOutstandingRequestsPerPeer = 10 - eventsChannelCapacity = 100 - requestTimeoutSeconds = 10 - maxTries = 3 - requestIntervalMS = 500 - requestBatchSize = 50 - maxPendingRequests = 50 - maxTotalRequests = 100 - maxPeersPerRequest = 1 + maxTries = 3 + inputsChannelCapacity = 200 + requestIntervalMS = 500 + maxPendingRequests = 200 + maxTotalRequests = 300 + maxRequestsPerPeer = 300 ) -type BlockRequest struct { - Height uint - PeerId string -} +var ( + requestTimeoutSeconds = time.Duration(1) +) type BlockPool struct { - peers map[string]*bpPeer - blockInfos map[uint]*bpBlockInfo - height uint // the lowest key in blockInfos. - started int32 // atomic - stopped int32 // atomic - numPending int32 - numTotal int32 - eventsCh chan interface{} // internal events. - requestsCh chan<- BlockRequest // output of new requests to make. - timeoutsCh chan<- string // output of peers that timed out. - blocksCh chan<- *types.Block // output of ordered blocks. - repeater *RepeatTimer // for requesting more bocks. - quit chan struct{} + // block requests + requestsMtx sync.Mutex + requests map[uint]*bpRequest + height uint // the lowest key in requests. + numPending int32 + numTotal int32 + + // peers + peersMtx sync.Mutex + peers map[string]*bpPeer + + requestsCh chan<- BlockRequest + timeoutsCh chan<- string + repeater *RepeatTimer + + running int32 // atomic } -func NewBlockPool(start uint, timeoutsCh chan<- string, requestsCh chan<- BlockRequest, blocksCh chan<- *types.Block) *BlockPool { +func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool { return &BlockPool{ - peers: make(map[string]*bpPeer), - blockInfos: make(map[uint]*bpBlockInfo), + peers: make(map[string]*bpPeer), + + requests: make(map[uint]*bpRequest), height: start, - started: 0, - stopped: 0, numPending: 0, numTotal: 0, - quit: make(chan struct{}), - eventsCh: make(chan interface{}, eventsChannelCapacity), requestsCh: requestsCh, timeoutsCh: timeoutsCh, - blocksCh: blocksCh, repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond), + + running: 0, } } -func (bp *BlockPool) Start() { - if atomic.CompareAndSwapInt32(&bp.started, 0, 1) { +func (pool *BlockPool) Start() { + if atomic.CompareAndSwapInt32(&pool.running, 0, 1) { log.Info("Starting BlockPool") - go bp.run() + go pool.run() } } -func (bp *BlockPool) Stop() { - if atomic.CompareAndSwapInt32(&bp.stopped, 0, 1) { +func (pool *BlockPool) Stop() { + if atomic.CompareAndSwapInt32(&pool.running, 1, 0) { log.Info("Stopping BlockPool") - close(bp.quit) - close(bp.eventsCh) - close(bp.requestsCh) - close(bp.timeoutsCh) - close(bp.blocksCh) - bp.repeater.Stop() + pool.repeater.Stop() } } -// AddBlock should be called when a block is received. -func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { - bp.eventsCh <- bpBlockResponse{block, peerId} +func (pool *BlockPool) IsRunning() bool { + return atomic.LoadInt32(&pool.running) == 1 } -func (bp *BlockPool) SetPeerStatus(peerId string, height uint) { - bp.eventsCh <- bpPeerStatus{peerId, height} -} - -// Runs in a goroutine and processes messages. -func (bp *BlockPool) run() { -FOR_LOOP: +// Run spawns requests as needed. +func (pool *BlockPool) run() { +RUN_LOOP: for { - select { - case msg := <-bp.eventsCh: - bp.handleEvent(msg) - case <-bp.repeater.Ch: - bp.makeMoreBlockInfos() - bp.requestBlocksFromRandomPeers(10) - case <-bp.quit: - break FOR_LOOP + if atomic.LoadInt32(&pool.running) == 0 { + break RUN_LOOP } - } -} - -func (bp *BlockPool) handleEvent(event_ interface{}) { - switch event := event_.(type) { - case bpBlockResponse: - peer := bp.peers[event.peerId] - blockInfo := bp.blockInfos[event.block.Height] - if blockInfo == nil { - // block was unwanted. - if peer != nil { - peer.bad++ - } + _, numPending, numTotal := pool.GetStatus() + if numPending >= maxPendingRequests { + // sleep for a bit. + time.Sleep(requestIntervalMS * time.Millisecond) + } else if numTotal >= maxTotalRequests { + // sleep for a bit. + time.Sleep(requestIntervalMS * time.Millisecond) } else { - // block was wanted. - if peer != nil { - peer.good++ - } - delete(peer.requests, event.block.Height) - if blockInfo.block == nil { - // peer is the first to give it to us. - blockInfo.block = event.block - blockInfo.blockBy = peer.id - bp.numPending-- - if event.block.Height == bp.height { - go bp.pushBlocksFromStart() - } - } - } - case bpPeerStatus: // updated or new status from peer - // request blocks if possible. - peer := bp.peers[event.peerId] - if peer == nil { - peer = bpNewPeer(event.peerId, event.height) - bp.peers[peer.id] = peer - } - bp.requestBlocksFromPeer(peer) - case bpRequestTimeout: // unconditional timeout for each peer's request. - peer := bp.peers[event.peerId] - if peer == nil { - // cleanup was already handled. - return - } - height := event.height - request := peer.requests[height] - if request == nil || request.block != nil { - // the request was fulfilled by some peer or this peer. - return + // request for more blocks. + height := pool.nextHeight() + pool.makeRequest(height) } + } +} - // A request for peer timed out. - peer.bad++ - if request.tries < maxTries { - log.Warn("Timeout: Trying again.", "tries", request.tries, "peerId", peer.id) - // try again. - select { - case bp.requestsCh <- BlockRequest{height, peer.id}: - request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries - default: - // The request cannot be made because requestCh is full. - // Just delete the request. - delete(peer.requests, height) - } +func (pool *BlockPool) GetStatus() (uint, int32, int32) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + return pool.height, pool.numPending, pool.numTotal +} + +// We need to see the second block's Validation to validate the first block. +// So we peek two blocks at a time. +func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + if r := pool.requests[pool.height]; r != nil { + first = r.block + } + if r := pool.requests[pool.height+1]; r != nil { + second = r.block + } + return +} + +// Pop the first block at pool.height +// It must have been validated by 'second'.Validation from PeekTwoBlocks(). +func (pool *BlockPool) PopRequest() { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + if r := pool.requests[pool.height]; r == nil || r.block == nil { + panic("PopRequest() requires a valid block") + } + + delete(pool.requests, pool.height) + pool.height++ + pool.numTotal-- +} + +// Invalidates the block at pool.height. +// Remove the peer and request from others. +func (pool *BlockPool) RedoRequest(height uint) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + request := pool.requests[height] + if request.block == nil { + panic("Expected block to be non-nil") + } + pool.RemovePeer(request.peerId) // Lock on peersMtx. + request.block = nil + request.peerId = "" + pool.numPending++ + + go requestRoutine(pool, height) +} + +func (pool *BlockPool) hasBlock(height uint) bool { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + request := pool.requests[height] + return request != nil && request.block != nil +} + +func (pool *BlockPool) setPeerForRequest(height uint, peerId string) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + request := pool.requests[height] + if request == nil { + return + } + request.peerId = peerId +} + +func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + request := pool.requests[block.Height] + if request == nil { + return + } + if request.peerId != peerId { + return + } + if request.block != nil { + return + } + request.block = block + pool.numPending-- +} + +func (pool *BlockPool) getPeer(peerId string) *bpPeer { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() + + peer := pool.peers[peerId] + return peer +} + +// Sets the peer's blockchain height. +func (pool *BlockPool) SetPeerHeight(peerId string, height uint) { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() + + peer := pool.peers[peerId] + if peer != nil { + peer.height = height + } else { + peer = &bpPeer{ + height: height, + id: peerId, + numRequests: 0, + } + pool.peers[peerId] = peer + } +} + +func (pool *BlockPool) RemovePeer(peerId string) { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() + + delete(pool.peers, peerId) +} + +// Pick an available peer with at least the given minHeight. +// If no peers are available, returns nil. +func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() + + for _, peer := range pool.peers { + if peer.numRequests >= maxRequestsPerPeer { + continue + } + if peer.height < minHeight { + continue + } + peer.numRequests++ + return peer + } + + return nil +} + +func (pool *BlockPool) decrPeer(peerId string) { + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() + + peer := pool.peers[peerId] + if peer == nil { + return + } + peer.numRequests-- +} + +func (pool *BlockPool) nextHeight() uint { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + return pool.height + uint(pool.numTotal) +} + +func (pool *BlockPool) makeRequest(height uint) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + request := &bpRequest{ + height: height, + peerId: "", + block: nil, + } + pool.requests[height] = request + + nextHeight := pool.height + uint(pool.numTotal) + if nextHeight == height { + pool.numTotal++ + pool.numPending++ + } + + go requestRoutine(pool, height) +} + +func (pool *BlockPool) sendRequest(height uint, peerId string) { + if atomic.LoadInt32(&pool.running) == 0 { + return + } + pool.requestsCh <- BlockRequest{height, peerId} +} + +func (pool *BlockPool) sendTimeout(peerId string) { + if atomic.LoadInt32(&pool.running) == 0 { + return + } + pool.timeoutsCh <- peerId +} + +func (pool *BlockPool) debug() string { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + str := "" + for h := pool.height; h < pool.height+uint(pool.numTotal); h++ { + if pool.requests[h] == nil { + str += Fmt("H(%v):X ", h) } else { - log.Warn("Timeout: Deleting request") - // delete the request. - delete(peer.requests, height) - blockInfo := bp.blockInfos[height] - if blockInfo != nil { - delete(blockInfo.requests, peer.id) - } - select { - case bp.timeoutsCh <- peer.id: - default: - } - + str += Fmt("H(%v):", h) + str += Fmt("B?(%v) ", pool.requests[h].block != nil) } } -} - -// NOTE: This function is sufficient, but we should find pending blocks -// and sample the peers in one go rather than the current O(n^2) impl. -func (bp *BlockPool) requestBlocksFromRandomPeers(maxPeers int) { - chosen := bp.pickAvailablePeers(maxPeers) - log.Debug("requestBlocksFromRandomPeers", "chosen", len(chosen)) - for _, peer := range chosen { - bp.requestBlocksFromPeer(peer) - } -} - -func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) { - // If peer is available and can provide something... - for height := bp.height; peer.available(); height++ { - blockInfo := bp.blockInfos[height] - if blockInfo == nil { - // We're out of range. - return - } - needsMorePeers := blockInfo.needsMorePeers() - alreadyAskedPeer := blockInfo.requests[peer.id] != nil - if needsMorePeers && !alreadyAskedPeer { - select { - case bp.requestsCh <- BlockRequest{height, peer.id}: - // Create a new request and start the timer. - request := &bpBlockRequest{ - height: height, - peer: peer, - } - blockInfo.requests[peer.id] = request - peer.requests[height] = request - request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries - default: - // The request cannot be made because requestCh is full. - // Just stop. - return - } - } - } -} - -func (bp *BlockPool) makeMoreBlockInfos() { - // make more requests if necessary. - for i := 0; i < requestBatchSize; i++ { - //log.Debug("Confused?", - // "numPending", bp.numPending, "maxPendingRequests", maxPendingRequests, "numtotal", bp.numTotal, "maxTotalRequests", maxTotalRequests) - if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests { - // Make a request for the next block height - requestHeight := bp.height + uint(bp.numTotal) - log.Debug("New blockInfo", "height", requestHeight) - blockInfo := bpNewBlockInfo(requestHeight) - bp.blockInfos[requestHeight] = blockInfo - bp.numPending++ - bp.numTotal++ - } else { - break - } - } -} - -func (bp *BlockPool) pickAvailablePeers(choose int) []*bpPeer { - available := []*bpPeer{} - for _, peer := range bp.peers { - if peer.available() { - available = append(available, peer) - } - } - perm := rand.Perm(MinInt(choose, len(available))) - chosen := make([]*bpPeer, len(perm)) - for i, idx := range perm { - chosen[i] = available[idx] - } - return chosen -} - -// blocking -func (bp *BlockPool) pushBlocksFromStart() { - for height := bp.height; ; height++ { - // push block to blocksCh. - blockInfo := bp.blockInfos[height] - if blockInfo == nil || blockInfo.block == nil { - break - } - bp.numTotal-- - bp.height++ - delete(bp.blockInfos, height) - bp.blocksCh <- blockInfo.block - } -} - -//----------------------------------------------------------------------------- - -type bpBlockInfo struct { - height uint - requests map[string]*bpBlockRequest - block *types.Block // first block received - blockBy string // peerId of source -} - -func bpNewBlockInfo(height uint) *bpBlockInfo { - return &bpBlockInfo{ - height: height, - requests: make(map[string]*bpBlockRequest), - } -} - -func (blockInfo *bpBlockInfo) needsMorePeers() bool { - return len(blockInfo.requests) < maxPeersPerRequest -} - -//------------------------------------- - -type bpBlockRequest struct { - peer *bpPeer - height uint - block *types.Block - tries int -} - -// bump tries++ and set timeout. -// NOTE: the timer is unconditional. -func (request *bpBlockRequest) startAndTimeoutTo(eventsCh chan<- interface{}) { - request.tries++ - time.AfterFunc(requestTimeoutSeconds*time.Second, func() { - eventsCh <- bpRequestTimeout{ - peerId: request.peer.id, - height: request.height, - } - }) + return str } //------------------------------------- type bpPeer struct { - id string - height uint - requests map[uint]*bpBlockRequest - // Count good/bad events from peer. - good uint - bad uint + id string + height uint + numRequests int32 } -func bpNewPeer(peerId string, height uint) *bpPeer { - return &bpPeer{ - id: peerId, - height: height, - requests: make(map[uint]*bpBlockRequest), - } -} - -func (peer *bpPeer) available() bool { - return len(peer.requests) < maxOutstandingRequestsPerPeer +type bpRequest struct { + height uint + peerId string + block *types.Block } //------------------------------------- -// bp.eventsCh messages -type bpBlockResponse struct { - block *types.Block - peerId string +// Responsible for making more requests as necessary +// Returns when a block is found (e.g. AddBlock() is called) +func requestRoutine(pool *BlockPool, height uint) { + for { + var peer *bpPeer = nil + PICK_LOOP: + for { + if !pool.IsRunning() { + log.Debug("BlockPool not running. Stopping requestRoutine", "height", height) + return + } + peer = pool.pickIncrAvailablePeer(height) + if peer == nil { + //log.Debug("No peers available", "height", height) + time.Sleep(requestIntervalMS * time.Millisecond) + continue PICK_LOOP + } + break PICK_LOOP + } + + pool.setPeerForRequest(height, peer.id) + + for try := 0; try < maxTries; try++ { + pool.sendRequest(height, peer.id) + time.Sleep(requestTimeoutSeconds * time.Second) + if pool.hasBlock(height) { + pool.decrPeer(peer.id) + return + } + bpHeight, _, _ := pool.GetStatus() + if height < bpHeight { + pool.decrPeer(peer.id) + return + } + } + + pool.RemovePeer(peer.id) + pool.sendTimeout(peer.id) + } } -type bpPeerStatus struct { - peerId string - height uint // blockchain tip of peer -} +//------------------------------------- -type bpRequestTimeout struct { - peerId string - height uint +type BlockRequest struct { + Height uint + PeerId string } diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 4376f3ac0..c07a11d85 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -3,6 +3,7 @@ package blockchain import ( "math/rand" "testing" + "time" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/types" @@ -24,26 +25,34 @@ func makePeers(numPeers int, minHeight, maxHeight uint) map[string]testPeer { } func TestBasic(t *testing.T) { - // 100 peers anywhere at height 0 to 1000. - peers := makePeers(100, 0, 1000) - + peers := makePeers(10, 0, 1000) start := uint(42) - maxHeight := uint(300) timeoutsCh := make(chan string, 100) requestsCh := make(chan BlockRequest, 100) - blocksCh := make(chan *types.Block, 100) - - pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) + pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() // Introduce each peer. go func() { for _, peer := range peers { - pool.SetPeerStatus(peer.id, peer.height) + pool.SetPeerHeight(peer.id, peer.height) } }() - lastSeenBlock := uint(41) + // Start a goroutine to pull blocks + go func() { + for { + if !pool.IsRunning() { + return + } + first, second := pool.PeekTwoBlocks() + if first != nil && second != nil { + pool.PopRequest() + } else { + time.Sleep(1 * time.Second) + } + } + }() // Pull from channels for { @@ -52,21 +61,15 @@ func TestBasic(t *testing.T) { t.Errorf("timeout: %v", peerId) case request := <-requestsCh: log.Debug("TEST: Pulled new BlockRequest", "request", request) - // After a while, pretend like we got a block from the peer. + if request.Height == 300 { + return // Done! + } + // Request desired, pretend like we got the block immediately. go func() { block := &types.Block{Header: &types.Header{Height: request.Height}} pool.AddBlock(block, request.PeerId) log.Debug("TEST: Added block", "block", request.Height, "peer", request.PeerId) }() - case block := <-blocksCh: - log.Debug("TEST: Pulled new Block", "height", block.Height) - if block.Height != lastSeenBlock+1 { - t.Fatalf("Wrong order of blocks seen. Expected: %v Got: %v", lastSeenBlock+1, block.Height) - } - lastSeenBlock++ - if block.Height == maxHeight { - return // Done! - } } } @@ -74,39 +77,52 @@ func TestBasic(t *testing.T) { } func TestTimeout(t *testing.T) { - peers := makePeers(100, 0, 1000) + peers := makePeers(10, 0, 1000) start := uint(42) - timeoutsCh := make(chan string, 10) - requestsCh := make(chan BlockRequest, 10) - blocksCh := make(chan *types.Block, 100) - - pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) + timeoutsCh := make(chan string, 100) + requestsCh := make(chan BlockRequest, 100) + pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() // Introduce each peer. go func() { for _, peer := range peers { - pool.SetPeerStatus(peer.id, peer.height) + pool.SetPeerHeight(peer.id, peer.height) + } + }() + + // Start a goroutine to pull blocks + go func() { + for { + if !pool.IsRunning() { + return + } + first, second := pool.PeekTwoBlocks() + if first != nil && second != nil { + pool.PopRequest() + } else { + time.Sleep(1 * time.Second) + } } }() // Pull from channels + counter := 0 + timedOut := map[string]struct{}{} for { select { case peerId := <-timeoutsCh: - // Timed out. Done! - if peers[peerId].id != peerId { - t.Errorf("Unexpected peer from timeoutsCh") + log.Debug("Timeout", "peerId", peerId) + if _, ok := timedOut[peerId]; !ok { + counter++ + if counter == len(peers) { + return // Done! + } } - return - case _ = <-requestsCh: - // Don't do anything, let it time out. - case _ = <-blocksCh: - t.Errorf("Got block when none expected") - return + case request := <-requestsCh: + log.Debug("TEST: Pulled new BlockRequest", "request", request) } } pool.Stop() - } diff --git a/blockchain/reactor.go b/blockchain/reactor.go new file mode 100644 index 000000000..6d65708f2 --- /dev/null +++ b/blockchain/reactor.go @@ -0,0 +1,304 @@ +package blockchain + +import ( + "bytes" + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +const ( + BlockchainChannel = byte(0x40) + defaultChannelCapacity = 100 + defaultSleepIntervalMS = 500 + trySyncIntervalMS = 100 + + // stop syncing when last block's time is + // within this much of the system time. + stopSyncingDurationMinutes = 10 +) + +type stateResetter interface { + ResetToState(*sm.State) +} + +// BlockchainReactor handles long-term catchup syncing. +type BlockchainReactor struct { + sw *p2p.Switch + state *sm.State + store *BlockStore + pool *BlockPool + sync bool + requestsCh chan BlockRequest + timeoutsCh chan string + lastBlock *types.Block + quit chan struct{} + running uint32 +} + +func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor { + if state.LastBlockHeight != store.Height() { + panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) + } + requestsCh := make(chan BlockRequest, defaultChannelCapacity) + timeoutsCh := make(chan string, defaultChannelCapacity) + pool := NewBlockPool( + store.Height()+1, + requestsCh, + timeoutsCh, + ) + bcR := &BlockchainReactor{ + state: state, + store: store, + pool: pool, + sync: sync, + requestsCh: requestsCh, + timeoutsCh: timeoutsCh, + quit: make(chan struct{}), + running: uint32(0), + } + return bcR +} + +// Implements Reactor +func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { + if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) { + log.Info("Starting BlockchainReactor") + bcR.sw = sw + bcR.pool.Start() + if bcR.sync { + go bcR.poolRoutine() + } + } +} + +// Implements Reactor +func (bcR *BlockchainReactor) Stop() { + if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) { + log.Info("Stopping BlockchainReactor") + close(bcR.quit) + bcR.pool.Stop() + } +} + +// Implements Reactor +func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + &p2p.ChannelDescriptor{ + Id: BlockchainChannel, + Priority: 5, + SendQueueCapacity: 100, + }, + } +} + +// Implements Reactor +func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { + // Send peer our state. + peer.Send(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()}) +} + +// Implements Reactor +func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + // Remove peer from the pool. + bcR.pool.RemovePeer(peer.Key) +} + +// Implements Reactor +func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { + _, msg_, err := DecodeMessage(msgBytes) + if err != nil { + log.Warn("Error decoding message", "error", err) + return + } + + log.Info("Received message", "msg", msg_) + + switch msg := msg_.(type) { + case bcBlockRequestMessage: + // Got a request for a block. Respond with block if we have it. + block := bcR.store.LoadBlock(msg.Height) + if block != nil { + msg := bcBlockResponseMessage{Block: block} + queued := src.TrySend(BlockchainChannel, msg) + if !queued { + // queue is full, just ignore. + } + } else { + // TODO peer is asking for things we don't have. + } + case bcBlockResponseMessage: + // Got a block. + bcR.pool.AddBlock(msg.Block, src.Key) + case bcPeerStatusMessage: + // Got a peer status. + bcR.pool.SetPeerHeight(src.Key, msg.Height) + default: + // Ignore unknown message + } +} + +// Handle messages from the poolReactor telling the reactor what to do. +func (bcR *BlockchainReactor) poolRoutine() { + + trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + +FOR_LOOP: + for { + select { + case request := <-bcR.requestsCh: // chan BlockRequest + peer := bcR.sw.Peers().Get(request.PeerId) + if peer == nil { + // We can't fulfill the request. + continue FOR_LOOP + } + msg := bcBlockRequestMessage{request.Height} + queued := peer.TrySend(BlockchainChannel, msg) + if !queued { + // We couldn't queue the request. + time.Sleep(defaultSleepIntervalMS * time.Millisecond) + continue FOR_LOOP + } + case peerId := <-bcR.timeoutsCh: // chan string + // Peer timed out. + peer := bcR.sw.Peers().Get(peerId) + if peer != nil { + bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) + } + case _ = <-trySyncTicker.C: // chan time + //var lastValidatedBlock *types.Block + SYNC_LOOP: + for i := 0; i < 10; i++ { + // See if there are any blocks to sync. + first, second := bcR.pool.PeekTwoBlocks() + //log.Debug("TrySync peeked", "first", first, "second", second) + if first == nil || second == nil { + // We need both to sync the first block. + break SYNC_LOOP + } + firstParts := first.MakePartSet() + firstPartsHeader := firstParts.Header() + // Finally, verify the first block using the second's validation. + err := bcR.state.BondedValidators.VerifyValidation( + first.Hash(), firstPartsHeader, first.Height, second.Validation) + if err != nil { + log.Debug("error in validation", "error", err) + bcR.pool.RedoRequest(first.Height) + break SYNC_LOOP + } else { + bcR.pool.PopRequest() + err := bcR.state.AppendBlock(first, firstPartsHeader) + if err != nil { + // TODO This is bad, are we zombie? + panic(Fmt("Failed to process committed block: %v", err)) + } + bcR.store.SaveBlock(first, firstParts, second.Validation) + bcR.state.Save() + //lastValidatedBlock = first + } + } + /* + // We're done syncing for now (will do again shortly) + // See if we want to stop syncing and turn on the + // consensus reactor. + // TODO: use other heuristics too besides blocktime. + // It's not a security concern, as it only needs to happen + // upon node sync, and there's also a second (slower) + // method of syncing in the consensus reactor. + + if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { + go func() { + log.Info("Stopping blockpool syncing, turning on consensus...") + trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others. + conR := bcR.sw.Reactor("CONSENSUS") + conR.(stateResetter).ResetToState(bcR.state) + conR.Start(bcR.sw) + for _, peer := range bcR.sw.Peers().List() { + conR.AddPeer(peer) + } + }() + break FOR_LOOP + } + */ + continue FOR_LOOP + case <-bcR.quit: + break FOR_LOOP + } + } +} + +func (bcR *BlockchainReactor) BroadcastStatus() error { + bcR.sw.Broadcast(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()}) + return nil +} + +//----------------------------------------------------------------------------- +// Messages + +const ( + msgTypeUnknown = byte(0x00) + msgTypeBlockRequest = byte(0x10) + msgTypeBlockResponse = byte(0x11) + msgTypePeerStatus = byte(0x20) +) + +// TODO: check for unnecessary extra bytes at the end. +func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) { + n := new(int64) + msgType = bz[0] + r := bytes.NewReader(bz) + switch msgType { + case msgTypeBlockRequest: + msg = binary.ReadBinary(bcBlockRequestMessage{}, r, n, &err) + case msgTypeBlockResponse: + msg = binary.ReadBinary(bcBlockResponseMessage{}, r, n, &err) + case msgTypePeerStatus: + msg = binary.ReadBinary(bcPeerStatusMessage{}, r, n, &err) + default: + msg = nil + } + return +} + +//------------------------------------- + +type bcBlockRequestMessage struct { + Height uint +} + +func (m bcBlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest } + +func (m bcBlockRequestMessage) String() string { + return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height) +} + +//------------------------------------- + +type bcBlockResponseMessage struct { + Block *types.Block +} + +func (m bcBlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse } + +func (m bcBlockResponseMessage) String() string { + return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height) +} + +//------------------------------------- + +type bcPeerStatusMessage struct { + Height uint +} + +func (m bcPeerStatusMessage) TypeByte() byte { return msgTypePeerStatus } + +func (m bcPeerStatusMessage) String() string { + return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height) +} diff --git a/blockchain/store.go b/blockchain/store.go index 8938273cb..f9d54cd23 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -57,7 +57,7 @@ func (bs *BlockStore) LoadBlock(height uint) *types.Block { if r == nil { panic(Fmt("Block does not exist at height %v", height)) } - meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta) + meta := binary.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta) if err != nil { panic(Fmt("Error reading block meta: %v", err)) } @@ -87,14 +87,14 @@ func (bs *BlockStore) LoadBlockPart(height uint, index uint) *types.Part { return part } -func (bs *BlockStore) LoadBlockMeta(height uint) *BlockMeta { +func (bs *BlockStore) LoadBlockMeta(height uint) *types.BlockMeta { var n int64 var err error r := bs.GetReader(calcBlockMetaKey(height)) if r == nil { panic(Fmt("BlockMeta does not exist for height %v", height)) } - meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta) + meta := binary.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta) if err != nil { panic(Fmt("Error reading block meta: %v", err)) } @@ -150,7 +150,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s } // Save block meta - meta := makeBlockMeta(block, blockParts) + meta := types.NewBlockMeta(block, blockParts) metaBytes := binary.BinaryBytes(meta) bs.db.Set(calcBlockMetaKey(height), metaBytes) @@ -184,22 +184,6 @@ func (bs *BlockStore) saveBlockPart(height uint, index uint, part *types.Part) { //----------------------------------------------------------------------------- -type BlockMeta struct { - Hash []byte // The block hash - Header *types.Header // The block's Header - Parts types.PartSetHeader // The PartSetHeader, for transfer -} - -func makeBlockMeta(block *types.Block, blockParts *types.PartSet) *BlockMeta { - return &BlockMeta{ - Hash: block.Hash(), - Header: block.Header, - Parts: blockParts.Header(), - } -} - -//----------------------------------------------------------------------------- - func calcBlockMetaKey(height uint) []byte { return []byte(fmt.Sprintf("H:%v", height)) } diff --git a/common/repeat_timer.go b/common/repeat_timer.go index de9b71fae..e2a5e1834 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -1,44 +1,65 @@ package common import "time" +import "sync" /* RepeatTimer repeatedly sends a struct{}{} to .Ch after each "dur" period. It's good for keeping connections alive. */ type RepeatTimer struct { - Name string - Ch chan struct{} - quit chan struct{} - dur time.Duration - timer *time.Timer + Ch chan time.Time + + mtx sync.Mutex + name string + ticker *time.Ticker + quit chan struct{} + dur time.Duration } func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer { - var ch = make(chan struct{}) - var quit = make(chan struct{}) - var t = &RepeatTimer{Name: name, Ch: ch, dur: dur, quit: quit} - t.timer = time.AfterFunc(dur, t.fireRoutine) + var t = &RepeatTimer{ + Ch: make(chan time.Time), + ticker: time.NewTicker(dur), + quit: make(chan struct{}), + name: name, + dur: dur, + } + go t.fireRoutine(t.ticker) return t } -func (t *RepeatTimer) fireRoutine() { - select { - case t.Ch <- struct{}{}: - t.timer.Reset(t.dur) - case <-t.quit: - // do nothing - default: - t.timer.Reset(t.dur) +func (t *RepeatTimer) fireRoutine(ticker *time.Ticker) { + for { + select { + case t_ := <-ticker.C: + t.Ch <- t_ + case <-t.quit: + return + } } } // Wait the duration again before firing. func (t *RepeatTimer) Reset() { - t.timer.Reset(t.dur) + t.mtx.Lock() // Lock + defer t.mtx.Unlock() + + if t.ticker != nil { + t.ticker.Stop() + } + t.ticker = time.NewTicker(t.dur) + go t.fireRoutine(t.ticker) } func (t *RepeatTimer) Stop() bool { - close(t.quit) - return t.timer.Stop() + t.mtx.Lock() // Lock + defer t.mtx.Unlock() + + exists := t.ticker != nil + if exists { + t.ticker.Stop() + t.ticker = nil + } + return exists } diff --git a/config/config.go b/config/config.go index 0abb40d03..48973a162 100644 --- a/config/config.go +++ b/config/config.go @@ -104,6 +104,8 @@ func initDefaults(rootDir string) { app.SetDefault("GenesisFile", rootDir+"/genesis.json") app.SetDefault("AddrBookFile", rootDir+"/addrbook.json") app.SetDefault("PrivValidatorfile", rootDir+"/priv_validator.json") + + app.SetDefault("FastSync", false) } func Init(rootDir string) { @@ -161,6 +163,7 @@ func ParseFlags(args []string) { flags.BoolVar(&printHelp, "help", false, "Print this help message.") flags.String("listen_addr", app.GetString("ListenAddr"), "Listen address. (0.0.0.0:0 means any interface, any port)") flags.String("seed_node", app.GetString("SeedNode"), "Address of seed node") + flags.Bool("fast_sync", app.GetBool("FastSync"), "Fast blockchain syncing") flags.String("rpc_http_listen_addr", app.GetString("RPC.HTTP.ListenAddr"), "RPC listen address. Port required") flags.Parse(args) if printHelp { @@ -171,6 +174,7 @@ func ParseFlags(args []string) { // Merge parsed flag values onto app. app.BindPFlag("ListenAddr", flags.Lookup("listen_addr")) app.BindPFlag("SeedNode", flags.Lookup("seed_node")) + app.BindPFlag("FastSync", flags.Lookup("fast_sync")) app.BindPFlag("RPC.HTTP.ListenAddr", flags.Lookup("rpc_http_listen_addr")) // Confused? diff --git a/consensus/pol.go b/consensus/pol.go index c87b4ee5d..06784d588 100644 --- a/consensus/pol.go +++ b/consensus/pol.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/tendermint/tendermint/account" + "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -94,3 +95,7 @@ func (pol *POL) StringShort() string { Fingerprint(pol.BlockHash), pol.BlockParts) } } + +func (pol *POL) MakePartSet() *types.PartSet { + return types.NewPartSetFromData(binary.BinaryBytes(pol)) +} diff --git a/consensus/reactor.go b/consensus/reactor.go index 83c244995..a3028b6d5 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,6 +9,7 @@ import ( "time" "github.com/tendermint/tendermint/binary" + bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" @@ -17,9 +18,9 @@ import ( ) const ( - StateCh = byte(0x20) - DataCh = byte(0x21) - VoteCh = byte(0x22) + StateChannel = byte(0x20) + DataChannel = byte(0x21) + VoteChannel = byte(0x22) peerStateKey = "ConsensusReactor.peerState" @@ -28,17 +29,18 @@ const ( //----------------------------------------------------------------------------- +// The reactor's underlying ConsensusState may change state at any time. +// We atomically copy the RoundState struct before using it. type ConsensusReactor struct { sw *p2p.Switch - started uint32 - stopped uint32 + running uint32 quit chan struct{} - blockStore *types.BlockStore + blockStore *bc.BlockStore conS *ConsensusState } -func NewConsensusReactor(consensusState *ConsensusState, blockStore *types.BlockStore) *ConsensusReactor { +func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor { conR := &ConsensusReactor{ blockStore: blockStore, quit: make(chan struct{}), @@ -49,7 +51,7 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *types.Block // Implements Reactor func (conR *ConsensusReactor) Start(sw *p2p.Switch) { - if atomic.CompareAndSwapUint32(&conR.started, 0, 1) { + if atomic.CompareAndSwapUint32(&conR.running, 0, 1) { log.Info("Starting ConsensusReactor") conR.sw = sw conR.conS.Start() @@ -59,15 +61,15 @@ func (conR *ConsensusReactor) Start(sw *p2p.Switch) { // Implements Reactor func (conR *ConsensusReactor) Stop() { - if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) { + if atomic.CompareAndSwapUint32(&conR.running, 1, 0) { log.Info("Stopping ConsensusReactor") conR.conS.Stop() close(conR.quit) } } -func (conR *ConsensusReactor) IsStopped() bool { - return atomic.LoadUint32(&conR.stopped) == 1 +func (conR *ConsensusReactor) IsRunning() bool { + return atomic.LoadUint32(&conR.running) == 1 } // Implements Reactor @@ -75,15 +77,15 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // TODO optimize return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: StateCh, + Id: StateChannel, Priority: 5, }, &p2p.ChannelDescriptor{ - Id: DataCh, + Id: DataChannel, Priority: 5, }, &p2p.ChannelDescriptor{ - Id: VoteCh, + Id: VoteChannel, Priority: 5, }, } @@ -91,6 +93,10 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { + if !conR.IsRunning() { + return + } + // Create peerState for peer peerState := NewPeerState(peer) peer.Data.Set(peerStateKey, peerState) @@ -105,11 +111,18 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { // Implements Reactor func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + if !conR.IsRunning() { + return + } + //peer.Data.Get(peerStateKey).(*PeerState).Disconnect() } // Implements Reactor func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { + if !conR.IsRunning() { + return + } // Get round state rs := conR.conS.GetRoundState() @@ -122,7 +135,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes) switch chId { - case StateCh: + case StateChannel: switch msg := msg_.(type) { case *NewRoundStepMessage: ps.ApplyNewRoundStepMessage(msg, rs) @@ -134,7 +147,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte // Ignore unknown message } - case DataCh: + case DataChannel: switch msg := msg_.(type) { case *Proposal: ps.SetHasProposal(msg) @@ -155,7 +168,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte // Ignore unknown message } - case VoteCh: + case VoteChannel: switch msg := msg_.(type) { case *VoteMessage: vote := msg.Vote @@ -192,7 +205,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte Type: vote.Type, Index: index, } - conR.sw.Broadcast(StateCh, msg) + conR.sw.Broadcast(StateChannel, msg) } default: @@ -212,6 +225,11 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) } +// Reset to some state. +func (conR *ConsensusReactor) ResetToState(state *sm.State) { + conR.conS.updateToState(state, false) +} + //-------------------------------------- func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { @@ -252,10 +270,10 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { - conR.sw.Broadcast(StateCh, nrsMsg) + conR.sw.Broadcast(StateChannel, nrsMsg) } if csMsg != nil { - conR.sw.Broadcast(StateCh, csMsg) + conR.sw.Broadcast(StateChannel, csMsg) } } } @@ -264,10 +282,10 @@ func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) { rs := conR.conS.GetRoundState() nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { - peer.Send(StateCh, nrsMsg) + peer.Send(StateChannel, nrsMsg) } if csMsg != nil { - peer.Send(StateCh, nrsMsg) + peer.Send(StateChannel, nrsMsg) } } @@ -276,7 +294,7 @@ func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) { OUTER_LOOP: for { // Manage disconnects from self or peer. - if peer.IsStopped() || conR.IsStopped() { + if !peer.IsRunning() || !conR.IsRunning() { log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer)) return } @@ -296,7 +314,7 @@ OUTER_LOOP: Type: partTypeProposalBlock, Part: part, } - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposalBlockPart(rs.Height, rs.Round, index) continue OUTER_LOOP } @@ -306,7 +324,7 @@ OUTER_LOOP: if 0 < prs.Height && prs.Height < rs.Height { //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray) if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok { - // Ensure that the peer's PartSetHeaeder is correct + // Ensure that the peer's PartSetHeader is correct blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) if !blockMeta.Parts.Equals(prs.ProposalBlockParts) { log.Debug("Peer ProposalBlockParts mismatch, sleeping", @@ -329,7 +347,7 @@ OUTER_LOOP: Type: partTypeProposalBlock, Part: part, } - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) continue OUTER_LOOP } else { @@ -349,7 +367,7 @@ OUTER_LOOP: // Send proposal? if rs.Proposal != nil && !prs.Proposal { msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal} - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposal(rs.Proposal) continue OUTER_LOOP } @@ -363,7 +381,7 @@ OUTER_LOOP: Type: partTypeProposalPOL, Part: rs.ProposalPOLParts.GetPart(index), } - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposalPOLPart(rs.Height, rs.Round, index) continue OUTER_LOOP } @@ -379,7 +397,7 @@ func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) OUTER_LOOP: for { // Manage disconnects from self or peer. - if peer.IsStopped() || conR.IsStopped() { + if !peer.IsRunning() || !conR.IsRunning() { log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer)) return } @@ -397,7 +415,7 @@ OUTER_LOOP: vote := voteSet.GetByIndex(index) // NOTE: vote may be a commit. msg := &VoteMessage{index, vote} - peer.Send(VoteCh, msg) + peer.Send(VoteChannel, msg) ps.SetHasVote(vote, index) return true } @@ -421,7 +439,7 @@ OUTER_LOOP: Signature: commit.Signature, } msg := &VoteMessage{index, vote} - peer.Send(VoteCh, msg) + peer.Send(VoteChannel, msg) ps.SetHasVote(vote, index) return true } diff --git a/consensus/state.go b/consensus/state.go index d44dd7b5a..683612496 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -62,6 +62,7 @@ import ( "github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/binary" + bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" . "github.com/tendermint/tendermint/consensus/types" @@ -234,7 +235,7 @@ type ConsensusState struct { stopped uint32 quit chan struct{} - blockStore *types.BlockStore + blockStore *bc.BlockStore mempoolReactor *mempl.MempoolReactor runActionCh chan RoundAction newStepCh chan *RoundState @@ -247,7 +248,7 @@ type ConsensusState struct { lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on. } -func NewConsensusState(state *sm.State, blockStore *types.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { +func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { cs := &ConsensusState{ quit: make(chan struct{}), blockStore: blockStore, @@ -255,7 +256,7 @@ func NewConsensusState(state *sm.State, blockStore *types.BlockStore, mempoolRea runActionCh: make(chan RoundAction, 1), newStepCh: make(chan *RoundState, 1), } - cs.updateToState(state) + cs.updateToState(state, true) return cs } @@ -456,9 +457,9 @@ ACTION_LOOP: // If calculated round is greater than 0 (based on BlockTime or calculated StartTime) // then also sets up the appropriate round, and cs.Step becomes RoundStepNewRound. // Otherwise the round is 0 and cs.Step becomes RoundStepNewHeight. -func (cs *ConsensusState) updateToState(state *sm.State) { +func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) { // Sanity check state. - if cs.Height > 0 && cs.Height != state.LastBlockHeight { + if contiguous && cs.Height > 0 && cs.Height != state.LastBlockHeight { panic(Fmt("updateToState() expected state height of %v but found %v", cs.Height, state.LastBlockHeight)) } @@ -466,6 +467,8 @@ func (cs *ConsensusState) updateToState(state *sm.State) { // Reset fields based on state. validators := state.BondedValidators height := state.LastBlockHeight + 1 // next desired block height + + // RoundState fields cs.Height = height cs.Round = 0 cs.Step = RoundStepNewHeight @@ -641,12 +644,12 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) { return } - blockParts = types.NewPartSetFromData(binary.BinaryBytes(block)) + blockParts = block.MakePartSet() pol = cs.LockedPOL // If exists, is a PoUnlock. } if pol != nil { - polParts = types.NewPartSetFromData(binary.BinaryBytes(pol)) + polParts = pol.MakePartSet() } // Make proposal @@ -856,7 +859,7 @@ func (cs *ConsensusState) TryFinalizeCommit(height uint) bool { // We have the block, so save/stage/sign-commit-vote. cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Commits) // Increment height. - cs.updateToState(cs.stagedState) + cs.updateToState(cs.stagedState, true) // cs.Step is now RoundStepNewHeight or RoundStepNewRound cs.newStepCh <- cs.getRoundState() return true diff --git a/consensus/test.go b/consensus/test.go index e86c6a075..397befa0d 100644 --- a/consensus/test.go +++ b/consensus/test.go @@ -3,15 +3,15 @@ package consensus import ( "sort" + bc "github.com/tendermint/tendermint/blockchain" dbm "github.com/tendermint/tendermint/db" mempl "github.com/tendermint/tendermint/mempool" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/types" ) func randConsensusState() (*ConsensusState, []*sm.PrivValidator) { state, _, privValidators := sm.RandGenesisState(20, false, 1000, 10, false, 1000) - blockStore := types.NewBlockStore(dbm.NewMemDB()) + blockStore := bc.NewBlockStore(dbm.NewMemDB()) mempool := mempl.NewMempool(state) mempoolReactor := mempl.NewMempoolReactor(mempool) cs := NewConsensusState(state, blockStore, mempoolReactor) diff --git a/consensus/vote_set.go b/consensus/vote_set.go index c640fc51b..973d4b7a1 100644 --- a/consensus/vote_set.go +++ b/consensus/vote_set.go @@ -34,7 +34,7 @@ type VoteSet struct { maj23Exists bool } -// Constructs a new VoteSet struct used to accumulate votes for each round. +// Constructs a new VoteSet struct used to accumulate votes for given height/round. func NewVoteSet(height uint, round uint, type_ byte, valSet *sm.ValidatorSet) *VoteSet { if height == 0 { panic("Cannot make VoteSet for height == 0, doesn't make sense.") diff --git a/daemon/daemon.go b/daemon/daemon.go index ccb2932de..dc43e2fa1 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -4,6 +4,7 @@ import ( "os" "os/signal" + bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" @@ -12,15 +13,15 @@ import ( "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/rpc" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/types" ) type Node struct { lz []p2p.Listener sw *p2p.Switch book *p2p.AddrBook + blockStore *bc.BlockStore pexReactor *p2p.PEXReactor - blockStore *types.BlockStore + bcReactor *bc.BlockchainReactor mempoolReactor *mempl.MempoolReactor consensusState *consensus.ConsensusState consensusReactor *consensus.ConsensusReactor @@ -30,7 +31,7 @@ type Node struct { func NewNode() *Node { // Get BlockStore blockStoreDB := dbm.GetDB("blockstore") - blockStore := types.NewBlockStore(blockStoreDB) + blockStore := bc.NewBlockStore(blockStoreDB) // Get State stateDB := dbm.GetDB("state") @@ -53,6 +54,9 @@ func NewNode() *Node { book := p2p.NewAddrBook(config.App().GetString("AddrBookFile")) pexReactor := p2p.NewPEXReactor(book) + // Get BlockchainReactor + bcReactor := bc.NewBlockchainReactor(state, blockStore, config.App().GetBool("FastSync")) + // Get MempoolReactor mempool := mempl.NewMempool(state.Copy()) mempoolReactor := mempl.NewMempoolReactor(mempool) @@ -64,14 +68,23 @@ func NewNode() *Node { consensusReactor.SetPrivValidator(privValidator) } - sw := p2p.NewSwitch([]p2p.Reactor{pexReactor, mempoolReactor, consensusReactor}) - sw.SetChainId(state.Hash(), config.App().GetString("Network")) + sw := p2p.NewSwitch() + sw.SetNetwork(config.App().GetString("Network")) + sw.AddReactor("PEX", pexReactor).Start(sw) + sw.AddReactor("MEMPOOL", mempoolReactor).Start(sw) + sw.AddReactor("BLOCKCHAIN", bcReactor).Start(sw) + if !config.App().GetBool("FastSync") { + sw.AddReactor("CONSENSUS", consensusReactor).Start(sw) + } else { + sw.AddReactor("CONSENSUS", consensusReactor) + } return &Node{ sw: sw, book: book, - pexReactor: pexReactor, blockStore: blockStore, + pexReactor: pexReactor, + bcReactor: bcReactor, mempoolReactor: mempoolReactor, consensusState: consensusState, consensusReactor: consensusReactor, @@ -85,7 +98,7 @@ func (n *Node) Start() { go n.inboundConnectionRoutine(l) } n.book.Start() - n.sw.Start() + //n.sw.StartReactors() } func (n *Node) Stop() { diff --git a/mempool/reactor.go b/mempool/reactor.go index 5bed4e18b..e16cf9332 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -11,7 +11,7 @@ import ( ) var ( - MempoolCh = byte(0x30) + MempoolChannel = byte(0x30) ) // MempoolReactor handles mempool tx broadcasting amongst peers. @@ -52,7 +52,7 @@ func (memR *MempoolReactor) Stop() { func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: MempoolCh, + Id: MempoolChannel, Priority: 5, }, } @@ -92,7 +92,7 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { if peer.Key == src.Key { continue } - peer.TrySend(MempoolCh, msg) + peer.TrySend(MempoolChannel, msg) } default: @@ -106,7 +106,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { return err } msg := &TxMessage{Tx: tx} - memR.sw.Broadcast(MempoolCh, msg) + memR.sw.Broadcast(MempoolChannel, msg) return nil } diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 43cad607c..893cf1c9e 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -381,7 +381,7 @@ out: for { select { case <-dumpAddressTicker.C: - log.Debug("Saving book to file", "size", a.Size()) + log.Debug("Saving AddrBook to file", "size", a.Size()) a.saveToFile(a.filePath) case <-a.quit: break out diff --git a/p2p/connection.go b/p2p/connection.go index 89086bc2a..c75538365 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -50,8 +50,9 @@ There are two methods for sending messages: func (m MConnection) TrySend(chId byte, msg interface{}) bool {} `Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued -for the channel with the given id byte `chId`. The message `msg` is serialized -using the `tendermint/binary` submodule's `WriteBinary()` reflection routine. +for the channel with the given id byte `chId`, or until the request times out. +The message `msg` is serialized using the `tendermint/binary` submodule's +`WriteBinary()` reflection routine. `TrySend(chId, msg)` is a nonblocking call that returns false if the channel's queue is full. @@ -416,6 +417,7 @@ FOR_LOOP: } msgBytes := channel.recvMsgPacket(pkt) if msgBytes != nil { + log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes) c.onReceive(pkt.ChannelId, msgBytes) } default: @@ -437,8 +439,19 @@ FOR_LOOP: //----------------------------------------------------------------------------- type ChannelDescriptor struct { - Id byte - Priority uint + Id byte + Priority uint + SendQueueCapacity uint + RecvBufferCapacity uint +} + +func (chDesc *ChannelDescriptor) FillDefaults() { + if chDesc.SendQueueCapacity == 0 { + chDesc.SendQueueCapacity = defaultSendQueueCapacity + } + if chDesc.RecvBufferCapacity == 0 { + chDesc.RecvBufferCapacity = defaultRecvBufferCapacity + } } // TODO: lowercase. @@ -448,7 +461,7 @@ type Channel struct { desc *ChannelDescriptor id byte sendQueue chan []byte - sendQueueSize uint32 + sendQueueSize uint32 // atomic. recving []byte sending []byte priority uint @@ -456,6 +469,7 @@ type Channel struct { } func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { + desc.FillDefaults() if desc.Priority <= 0 { panic("Channel default priority must be a postive integer") } @@ -463,8 +477,8 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { conn: conn, desc: desc, id: desc.Id, - sendQueue: make(chan []byte, defaultSendQueueCapacity), - recving: make([]byte, 0, defaultRecvBufferCapacity), + sendQueue: make(chan []byte, desc.SendQueueCapacity), + recving: make([]byte, 0, desc.RecvBufferCapacity), priority: desc.Priority, } } diff --git a/p2p/peer.go b/p2p/peer.go index 68137a63a..173297eb0 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -13,8 +13,7 @@ import ( type Peer struct { outbound bool mconn *MConnection - started uint32 - stopped uint32 + running uint32 Key string Data *CMap // User data. @@ -37,7 +36,7 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc p = &Peer{ outbound: outbound, mconn: mconn, - stopped: 0, + running: 0, Key: mconn.RemoteAddress.String(), Data: NewCMap(), } @@ -45,21 +44,21 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc } func (p *Peer) start() { - if atomic.CompareAndSwapUint32(&p.started, 0, 1) { + if atomic.CompareAndSwapUint32(&p.running, 0, 1) { log.Debug("Starting Peer", "peer", p) p.mconn.Start() } } func (p *Peer) stop() { - if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { + if atomic.CompareAndSwapUint32(&p.running, 1, 0) { log.Debug("Stopping Peer", "peer", p) p.mconn.Stop() } } -func (p *Peer) IsStopped() bool { - return atomic.LoadUint32(&p.stopped) == 1 +func (p *Peer) IsRunning() bool { + return atomic.LoadUint32(&p.running) == 1 } func (p *Peer) Connection() *MConnection { @@ -71,21 +70,21 @@ func (p *Peer) IsOutbound() bool { } func (p *Peer) Send(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&p.stopped) == 1 { + if atomic.LoadUint32(&p.running) == 0 { return false } return p.mconn.Send(chId, msg) } func (p *Peer) TrySend(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&p.stopped) == 1 { + if atomic.LoadUint32(&p.running) == 0 { return false } return p.mconn.TrySend(chId, msg) } func (p *Peer) CanSend(chId byte) bool { - if atomic.LoadUint32(&p.stopped) == 1 { + if atomic.LoadUint32(&p.running) == 0 { return false } return p.mconn.CanSend(chId) diff --git a/p2p/peer_set.go b/p2p/peer_set.go index b4230ffa3..effad6dcc 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -7,6 +7,7 @@ import ( // IPeerSet has a (immutable) subset of the methods of PeerSet. type IPeerSet interface { Has(key string) bool + Get(key string) *Peer List() []*Peer Size() int } @@ -55,6 +56,17 @@ func (ps *PeerSet) Has(peerKey string) bool { return ok } +func (ps *PeerSet) Get(peerKey string) *Peer { + ps.mtx.Lock() + defer ps.mtx.Unlock() + item, ok := ps.lookup[peerKey] + if ok { + return item.peer + } else { + return nil + } +} + func (ps *PeerSet) Remove(peer *Peer) { ps.mtx.Lock() defer ps.mtx.Unlock() diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 90be9b24c..5ba238d31 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -14,7 +14,7 @@ import ( var pexErrInvalidMessage = errors.New("Invalid PEX message") const ( - PexCh = byte(0x00) + PexChannel = byte(0x00) ensurePeersPeriodSeconds = 30 minNumOutboundPeers = 10 maxNumPeers = 50 @@ -62,8 +62,9 @@ func (pexR *PEXReactor) Stop() { func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ &ChannelDescriptor{ - Id: PexCh, - Priority: 1, + Id: PexChannel, + Priority: 1, + SendQueueCapacity: 10, }, } } @@ -97,9 +98,9 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { switch msg.(type) { case *pexHandshakeMessage: - chainId := msg.(*pexHandshakeMessage).ChainId - if chainId != pexR.sw.chainId { - err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", chainId, pexR.sw.chainId) + network := msg.(*pexHandshakeMessage).Network + if network != pexR.sw.network { + err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", network, pexR.sw.network) pexR.sw.StopPeerForError(src, err) } case *pexRequestMessage: @@ -122,11 +123,11 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { // Asks peer for more addresses. func (pexR *PEXReactor) RequestPEX(peer *Peer) { - peer.Send(PexCh, &pexRequestMessage{}) + peer.Send(PexChannel, &pexRequestMessage{}) } func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { - peer.Send(PexCh, &pexAddrsMessage{Addrs: addrs}) + peer.Send(PexChannel, &pexAddrsMessage{Addrs: addrs}) } // Ensures that sufficient peers are connected. (continuous) @@ -175,10 +176,12 @@ func (pexR *PEXReactor) ensurePeers() { alreadyDialing := pexR.sw.IsDialing(try) alreadyConnected := pexR.sw.Peers().Has(try.String()) if alreadySelected || alreadyDialing || alreadyConnected { - log.Debug("Cannot dial address", "addr", try, - "alreadySelected", alreadySelected, - "alreadyDialing", alreadyDialing, - "alreadyConnected", alreadyConnected) + /* + log.Debug("Cannot dial address", "addr", try, + "alreadySelected", alreadySelected, + "alreadyDialing", alreadyDialing, + "alreadyConnected", alreadyConnected) + */ continue } else { log.Debug("Will dial address", "addr", try) @@ -237,7 +240,7 @@ func DecodeMessage(bz []byte) (msg interface{}, err error) { A pexHandshakeMessage contains the peer's chainId */ type pexHandshakeMessage struct { - ChainId string + Network string } func (m *pexHandshakeMessage) TypeByte() byte { return msgTypeHandshake } diff --git a/p2p/switch.go b/p2p/switch.go index 635e0ecaa..1eb513089 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -1,11 +1,9 @@ package p2p import ( - "encoding/hex" "errors" "fmt" "net" - "sync/atomic" "time" . "github.com/tendermint/tendermint/common" @@ -29,20 +27,16 @@ or more `Channels`. So while sending outgoing messages is typically performed o incoming messages are received on the reactor. */ type Switch struct { - reactors []Reactor + network string + reactors map[string]Reactor chDescs []*ChannelDescriptor reactorsByCh map[byte]Reactor peers *PeerSet dialing *CMap listeners *CMap // listenerName -> chan interface{} - quit chan struct{} - started uint32 - stopped uint32 - chainId string } var ( - ErrSwitchStopped = errors.New("Switch already stopped") ErrSwitchDuplicatePeer = errors.New("Duplicate peer") ) @@ -50,71 +44,83 @@ const ( peerDialTimeoutSeconds = 3 ) -func NewSwitch(reactors []Reactor) *Switch { - - // Validate the reactors. no two reactors can share the same channel. - chDescs := []*ChannelDescriptor{} - reactorsByCh := make(map[byte]Reactor) - for _, reactor := range reactors { - reactorChannels := reactor.GetChannels() - for _, chDesc := range reactorChannels { - chId := chDesc.Id - if reactorsByCh[chId] != nil { - panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, reactorsByCh[chId], reactor)) - } - chDescs = append(chDescs, chDesc) - reactorsByCh[chId] = reactor - } - } +func NewSwitch() *Switch { sw := &Switch{ - reactors: reactors, - chDescs: chDescs, - reactorsByCh: reactorsByCh, + network: "", + reactors: make(map[string]Reactor), + chDescs: make([]*ChannelDescriptor, 0), + reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: NewCMap(), listeners: NewCMap(), - quit: make(chan struct{}), - stopped: 0, } return sw } -func (sw *Switch) Start() { - if atomic.CompareAndSwapUint32(&sw.started, 0, 1) { - log.Info("Starting Switch") - for _, reactor := range sw.reactors { - reactor.Start(sw) +// Not goroutine safe. +func (sw *Switch) SetNetwork(network string) { + sw.network = network +} + +// Not goroutine safe. +func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { + // Validate the reactor. + // No two reactors can share the same channel. + reactorChannels := reactor.GetChannels() + for _, chDesc := range reactorChannels { + chId := chDesc.Id + if sw.reactorsByCh[chId] != nil { + panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, sw.reactorsByCh[chId], reactor)) } + sw.chDescs = append(sw.chDescs, chDesc) + sw.reactorsByCh[chId] = reactor + } + sw.reactors[name] = reactor + return reactor +} + +func (sw *Switch) Reactor(name string) Reactor { + return sw.reactors[name] +} + +// Convenience function +func (sw *Switch) StartReactors() { + for _, reactor := range sw.reactors { + reactor.Start(sw) } } +// Convenience function +func (sw *Switch) StopReactors() { + // Stop all reactors. + for _, reactor := range sw.reactors { + reactor.Stop() + } +} + +// Convenience function +func (sw *Switch) StopPeers() { + // Stop each peer. + for _, peer := range sw.peers.List() { + peer.stop() + } + sw.peers = NewPeerSet() +} + +// Convenience function func (sw *Switch) Stop() { - if atomic.CompareAndSwapUint32(&sw.stopped, 0, 1) { - log.Info("Stopping Switch") - close(sw.quit) - // Stop each peer. - for _, peer := range sw.peers.List() { - peer.stop() - } - sw.peers = NewPeerSet() - // Stop all reactors. - for _, reactor := range sw.reactors { - reactor.Stop() - } - } + sw.StopPeers() + sw.StopReactors() } -func (sw *Switch) Reactors() []Reactor { +// Not goroutine safe to modify. +func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - if atomic.LoadUint32(&sw.stopped) == 1 { - return nil, ErrSwitchStopped - } - peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) // Add the peer to .peers @@ -126,23 +132,19 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er } // Start the peer - go peer.start() + peer.start() // Notify listeners. sw.doAddPeer(peer) // Send handshake - msg := &pexHandshakeMessage{ChainId: sw.chainId} - peer.Send(PexCh, msg) + msg := &pexHandshakeMessage{Network: sw.network} + peer.Send(PexChannel, msg) return peer, nil } func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { - if atomic.LoadUint32(&sw.stopped) == 1 { - return nil, ErrSwitchStopped - } - log.Debug("Dialing address", "address", addr) sw.dialing.Set(addr.String(), addr) conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second) @@ -164,13 +166,10 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { return sw.dialing.Has(addr.String()) } -// Broadcast runs a go routine for each attemptted send, which will block +// Broadcast runs a go routine for each attempted send, which will block // trying to send for defaultSendTimeoutSeconds. Returns a channel // which receives success values for each attempted send (false if times out) func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool { - if atomic.LoadUint32(&sw.stopped) == 1 { - return nil - } successChan := make(chan bool, len(sw.peers.List())) log.Debug("Broadcast", "channel", chId, "msg", msg) for _, peer := range sw.peers.List() { @@ -223,10 +222,6 @@ func (sw *Switch) StopPeerGracefully(peer *Peer) { sw.doRemovePeer(peer, nil) } -func (sw *Switch) SetChainId(hash []byte, network string) { - sw.chainId = hex.EncodeToString(hash) + "-" + network -} - func (sw *Switch) IsListening() bool { return sw.listeners.Size() > 0 } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index a260df27a..ffdb9950c 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -69,11 +69,11 @@ func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) { //----------------------------------------------------------------------------- // convenience method for creating two switches connected to each other. -func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, *Switch) { +func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *Switch) { // Create two switches that will be interconnected. - s1 := NewSwitch(reactorsGenerator()) - s2 := NewSwitch(reactorsGenerator()) + s1 := initSwitch(NewSwitch()) + s2 := initSwitch(NewSwitch()) // Create a listener for s1 l := NewDefaultListener("tcp", ":8001", true) @@ -104,18 +104,17 @@ func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, } func TestSwitches(t *testing.T) { - s1, s2 := makeSwitchPair(t, func() []Reactor { + s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch { // Make two reactors of two channels each - reactors := make([]Reactor, 2) - reactors[0] = NewTestReactor([]*ChannelDescriptor{ + sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x00), Priority: 10}, &ChannelDescriptor{Id: byte(0x01), Priority: 10}, - }, true) - reactors[1] = NewTestReactor([]*ChannelDescriptor{ + }, true)).Start(sw) // Start the reactor + sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x02), Priority: 10}, &ChannelDescriptor{Id: byte(0x03), Priority: 10}, - }, true) - return reactors + }, true)).Start(sw) // Start the reactor + return sw }) defer s1.Stop() defer s2.Stop() @@ -129,8 +128,8 @@ func TestSwitches(t *testing.T) { } ch0Msg := "channel zero" - ch1Msg := "channel one" - ch2Msg := "channel two" + ch1Msg := "channel foo" + ch2Msg := "channel bar" s1.Broadcast(byte(0x00), ch0Msg) s1.Broadcast(byte(0x01), ch1Msg) @@ -140,7 +139,7 @@ func TestSwitches(t *testing.T) { time.Sleep(5000 * time.Millisecond) // Check message on ch0 - ch0Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x00)] + ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)] if len(ch0Msgs) != 2 { t.Errorf("Expected to have received 1 message in ch0") } @@ -149,7 +148,7 @@ func TestSwitches(t *testing.T) { } // Check message on ch1 - ch1Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x01)] + ch1Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x01)] if len(ch1Msgs) != 1 { t.Errorf("Expected to have received 1 message in ch1") } @@ -158,7 +157,7 @@ func TestSwitches(t *testing.T) { } // Check message on ch2 - ch2Msgs := s2.Reactors()[1].(*TestReactor).msgsReceived[byte(0x02)] + ch2Msgs := s2.Reactor("bar").(*TestReactor).msgsReceived[byte(0x02)] if len(ch2Msgs) != 1 { t.Errorf("Expected to have received 1 message in ch2") } @@ -172,18 +171,17 @@ func BenchmarkSwitches(b *testing.B) { b.StopTimer() - s1, s2 := makeSwitchPair(b, func() []Reactor { - // Make two reactors of two channels each - reactors := make([]Reactor, 2) - reactors[0] = NewTestReactor([]*ChannelDescriptor{ + s1, s2 := makeSwitchPair(b, func(sw *Switch) *Switch { + // Make bar reactors of bar channels each + sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x00), Priority: 10}, &ChannelDescriptor{Id: byte(0x01), Priority: 10}, - }, false) - reactors[1] = NewTestReactor([]*ChannelDescriptor{ + }, false)) + sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x02), Priority: 10}, &ChannelDescriptor{Id: byte(0x03), Priority: 10}, - }, false) - return reactors + }, false)) + return sw }) defer s1.Stop() defer s2.Stop() @@ -194,7 +192,7 @@ func BenchmarkSwitches(b *testing.B) { numSuccess, numFailure := 0, 0 - // Send random message from one channel to another + // Send random message from foo channel to another for i := 0; i < b.N; i++ { chId := byte(i % 4) successChan := s1.Broadcast(chId, "test data") diff --git a/rpc/rpc.go b/rpc/rpc.go index 94631249c..8cf905d3a 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -1,18 +1,18 @@ package rpc import ( + bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" ) -var blockStore *types.BlockStore +var blockStore *bc.BlockStore var consensusState *consensus.ConsensusState var mempoolReactor *mempl.MempoolReactor var p2pSwitch *p2p.Switch -func SetRPCBlockStore(bs *types.BlockStore) { +func SetRPCBlockStore(bs *bc.BlockStore) { blockStore = bs } diff --git a/state/state_test.go b/state/state_test.go index da7960a77..0d2c963bb 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -2,7 +2,6 @@ package state import ( "github.com/tendermint/tendermint/account" - "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/types" @@ -91,7 +90,7 @@ func TestGenesisSaveLoad(t *testing.T) { // Make complete block and blockParts block := makeBlock(t, s0, nil, nil) - blockParts := types.NewPartSetFromData(binary.BinaryBytes(block)) + blockParts := block.MakePartSet() // Now append the block to s0. err := s0.AppendBlock(block, blockParts.Header()) @@ -338,7 +337,7 @@ func TestAddValidator(t *testing.T) { // Make complete block and blockParts block0 := makeBlock(t, s0, nil, []types.Tx{bondTx}) - block0Parts := types.NewPartSetFromData(binary.BinaryBytes(block0)) + block0Parts := block0.MakePartSet() // Sanity check if s0.BondedValidators.Size() != 1 { @@ -379,7 +378,7 @@ func TestAddValidator(t *testing.T) { }, }, nil, ) - block1Parts := types.NewPartSetFromData(binary.BinaryBytes(block1)) + block1Parts := block1.MakePartSet() err = s0.AppendBlock(block1, block1Parts.Header()) if err != nil { t.Error("Error appending secondary block:", err) diff --git a/state/validator_set.go b/state/validator_set.go index 50f76f423..09589a9db 100644 --- a/state/validator_set.go +++ b/state/validator_set.go @@ -2,12 +2,15 @@ package state import ( "bytes" + "errors" "fmt" "sort" "strings" + "github.com/tendermint/tendermint/account" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/merkle" + "github.com/tendermint/tendermint/types" ) // ValidatorSet represent a set of *Validator at a given height. @@ -198,6 +201,50 @@ func (valSet *ValidatorSet) Iterate(fn func(index uint, val *Validator) bool) { } } +// Verify that +2/3 of the set had signed the given signBytes +func (valSet *ValidatorSet) VerifyValidation(hash []byte, parts types.PartSetHeader, height uint, v *types.Validation) error { + if valSet.Size() != uint(len(v.Commits)) { + return errors.New(Fmt("Invalid validation -- wrong set size: %v vs %v", + valSet.Size(), len(v.Commits))) + } + + talliedVotingPower := uint64(0) + seenValidators := map[string]struct{}{} + + for idx, commit := range v.Commits { + // may be zero, in which case skip. + if commit.Signature.IsZero() { + continue + } + _, val := valSet.GetByIndex(uint(idx)) + commitSignBytes := account.SignBytes(&types.Vote{ + Height: height, Round: commit.Round, Type: types.VoteTypeCommit, + BlockHash: hash, + BlockParts: parts, + }) + + // Validate + if _, seen := seenValidators[string(val.Address)]; seen { + return Errorf("Duplicate validator for commit %v for Validation %v", commit, v) + } + + if !val.PubKey.VerifyBytes(commitSignBytes, commit.Signature) { + return Errorf("Invalid signature for commit %v for Validation %v", commit, v) + } + + // Tally + seenValidators[string(val.Address)] = struct{}{} + talliedVotingPower += val.VotingPower + } + + if talliedVotingPower > valSet.TotalVotingPower()*2/3 { + return nil + } else { + return Errorf("insufficient voting power %v, needed %v", + talliedVotingPower, (valSet.TotalVotingPower()*2/3 + 1)) + } +} + func (valSet *ValidatorSet) String() string { return valSet.StringIndented("") } diff --git a/types/block.go b/types/block.go index d56f4e467..176ce4385 100644 --- a/types/block.go +++ b/types/block.go @@ -39,7 +39,9 @@ func (b *Block) ValidateBasic(lastBlockHeight uint, lastBlockHash []byte, if !b.LastBlockParts.Equals(lastBlockParts) { return errors.New("Wrong Block.Header.LastBlockParts") } - /* TODO: Determine bounds. + /* TODO: Determine bounds + See blockchain/reactor "stopSyncingDurationMinutes" + if !b.Time.After(lastBlockTime) { return errors.New("Invalid Block.Header.Time") } @@ -66,6 +68,10 @@ func (b *Block) Hash() []byte { return merkle.HashFromHashes(hashes) } +func (b *Block) MakePartSet() *PartSet { + return NewPartSetFromData(binary.BinaryBytes(b)) +} + // Convenience. // A nil block never hashes to anything. // Nothing hashes to a nil hash. diff --git a/types/block_meta.go b/types/block_meta.go new file mode 100644 index 000000000..3e9ba8f91 --- /dev/null +++ b/types/block_meta.go @@ -0,0 +1,15 @@ +package types + +type BlockMeta struct { + Hash []byte // The block hash + Header *Header // The block's Header + Parts PartSetHeader // The PartSetHeader, for transfer +} + +func NewBlockMeta(block *Block, blockParts *PartSet) *BlockMeta { + return &BlockMeta{ + Hash: block.Hash(), + Header: block.Header, + Parts: blockParts.Header(), + } +} diff --git a/types/store.go b/types/store.go deleted file mode 100644 index 3afc8cb58..000000000 --- a/types/store.go +++ /dev/null @@ -1,247 +0,0 @@ -package types - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - - "github.com/tendermint/tendermint/binary" - . "github.com/tendermint/tendermint/common" - dbm "github.com/tendermint/tendermint/db" -) - -/* -Simple low level store for blocks. - -There are three types of information stored: - - BlockMeta: Meta information about each block - - Block part: Parts of each block, aggregated w/ PartSet - - Validation: The Validation part of each block, for gossiping commit votes - -Currently the commit signatures are duplicated in the Block parts as -well as the Validation. In the future this may change, perhaps by moving -the Validation data outside the Block. -*/ -type BlockStore struct { - height uint - db dbm.DB -} - -func NewBlockStore(db dbm.DB) *BlockStore { - bsjson := LoadBlockStoreStateJSON(db) - return &BlockStore{ - height: bsjson.Height, - db: db, - } -} - -// Height() returns the last known contiguous block height. -func (bs *BlockStore) Height() uint { - return bs.height -} - -func (bs *BlockStore) GetReader(key []byte) io.Reader { - bytez := bs.db.Get(key) - if bytez == nil { - return nil - } - return bytes.NewReader(bytez) -} - -func (bs *BlockStore) LoadBlock(height uint) *Block { - var n int64 - var err error - r := bs.GetReader(calcBlockMetaKey(height)) - if r == nil { - panic(Fmt("Block does not exist at height %v", height)) - } - meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta) - if err != nil { - panic(Fmt("Error reading block meta: %v", err)) - } - bytez := []byte{} - for i := uint(0); i < meta.Parts.Total; i++ { - part := bs.LoadBlockPart(height, i) - bytez = append(bytez, part.Bytes...) - } - block := binary.ReadBinary(&Block{}, bytes.NewReader(bytez), &n, &err).(*Block) - if err != nil { - panic(Fmt("Error reading block: %v", err)) - } - return block -} - -func (bs *BlockStore) LoadBlockPart(height uint, index uint) *Part { - var n int64 - var err error - r := bs.GetReader(calcBlockPartKey(height, index)) - if r == nil { - panic(Fmt("BlockPart does not exist for height %v index %v", height, index)) - } - part := binary.ReadBinary(&Part{}, r, &n, &err).(*Part) - if err != nil { - panic(Fmt("Error reading block part: %v", err)) - } - return part -} - -func (bs *BlockStore) LoadBlockMeta(height uint) *BlockMeta { - var n int64 - var err error - r := bs.GetReader(calcBlockMetaKey(height)) - if r == nil { - panic(Fmt("BlockMeta does not exist for height %v", height)) - } - meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta) - if err != nil { - panic(Fmt("Error reading block meta: %v", err)) - } - return meta -} - -// NOTE: the Commit-vote heights are for the block at `height-1` -// Since these are included in the subsequent block, the height -// is off by 1. -func (bs *BlockStore) LoadBlockValidation(height uint) *Validation { - var n int64 - var err error - r := bs.GetReader(calcBlockValidationKey(height)) - if r == nil { - panic(Fmt("BlockValidation does not exist for height %v", height)) - } - validation := binary.ReadBinary(&Validation{}, r, &n, &err).(*Validation) - if err != nil { - panic(Fmt("Error reading validation: %v", err)) - } - return validation -} - -// NOTE: the Commit-vote heights are for the block at `height` -func (bs *BlockStore) LoadSeenValidation(height uint) *Validation { - var n int64 - var err error - r := bs.GetReader(calcSeenValidationKey(height)) - if r == nil { - panic(Fmt("SeenValidation does not exist for height %v", height)) - } - validation := binary.ReadBinary(&Validation{}, r, &n, &err).(*Validation) - if err != nil { - panic(Fmt("Error reading validation: %v", err)) - } - return validation -} - -// blockParts: Must be parts of the block -// seenValidation: The +2/3 commits that were seen which finalized the height. -// If all the nodes restart after committing a block, -// we need this to reload the commits to catch-up nodes to the -// most recent height. Otherwise they'd stall at H-1. -// Also good to have to debug consensus issues & punish wrong-signers -// whose commits weren't included in the block. -func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet, seenValidation *Validation) { - height := block.Height - if height != bs.height+1 { - panic(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height)) - } - if !blockParts.IsComplete() { - panic(Fmt("BlockStore can only save complete block part sets")) - } - - // Save block meta - meta := makeBlockMeta(block, blockParts) - metaBytes := binary.BinaryBytes(meta) - bs.db.Set(calcBlockMetaKey(height), metaBytes) - - // Save block parts - for i := uint(0); i < blockParts.Total(); i++ { - bs.saveBlockPart(height, i, blockParts.GetPart(i)) - } - - // Save block validation (duplicate and separate from the Block) - blockValidationBytes := binary.BinaryBytes(block.Validation) - bs.db.Set(calcBlockValidationKey(height), blockValidationBytes) - - // Save seen validation (seen +2/3 commits) - seenValidationBytes := binary.BinaryBytes(seenValidation) - bs.db.Set(calcSeenValidationKey(height), seenValidationBytes) - - // Save new BlockStoreStateJSON descriptor - BlockStoreStateJSON{Height: height}.Save(bs.db) - - // Done! - bs.height = height -} - -func (bs *BlockStore) saveBlockPart(height uint, index uint, part *Part) { - if height != bs.height+1 { - panic(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height)) - } - partBytes := binary.BinaryBytes(part) - bs.db.Set(calcBlockPartKey(height, index), partBytes) -} - -//----------------------------------------------------------------------------- - -type BlockMeta struct { - Hash []byte // The block hash - Header *Header // The block's Header - Parts PartSetHeader // The PartSetHeader, for transfer -} - -func makeBlockMeta(block *Block, blockParts *PartSet) *BlockMeta { - return &BlockMeta{ - Hash: block.Hash(), - Header: block.Header, - Parts: blockParts.Header(), - } -} - -//----------------------------------------------------------------------------- - -func calcBlockMetaKey(height uint) []byte { - return []byte(fmt.Sprintf("H:%v", height)) -} - -func calcBlockPartKey(height uint, partIndex uint) []byte { - return []byte(fmt.Sprintf("P:%v:%v", height, partIndex)) -} - -func calcBlockValidationKey(height uint) []byte { - return []byte(fmt.Sprintf("V:%v", height)) -} - -func calcSeenValidationKey(height uint) []byte { - return []byte(fmt.Sprintf("SV:%v", height)) -} - -//----------------------------------------------------------------------------- - -var blockStoreKey = []byte("blockStore") - -type BlockStoreStateJSON struct { - Height uint -} - -func (bsj BlockStoreStateJSON) Save(db dbm.DB) { - bytes, err := json.Marshal(bsj) - if err != nil { - panic(Fmt("Could not marshal state bytes: %v", err)) - } - db.Set(blockStoreKey, bytes) -} - -func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON { - bytes := db.Get(blockStoreKey) - if bytes == nil { - return BlockStoreStateJSON{ - Height: 0, - } - } - bsj := BlockStoreStateJSON{} - err := json.Unmarshal(bytes, &bsj) - if err != nil { - panic(Fmt("Could not unmarshal bytes: %X", bytes)) - } - return bsj -}