From e227b9ea071c1917225f78de179eb6c936ba4fcb Mon Sep 17 00:00:00 2001 From: Jasmina Malicevic Date: Tue, 19 Apr 2022 15:58:39 +0200 Subject: [PATCH] blocksync/pool: initial changes for witness support --- internal/blocksync/pool.go | 166 +++++++++++++++++++++++++++++++++++-- 1 file changed, 157 insertions(+), 9 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 4c92041ef..544eba725 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -66,6 +66,13 @@ type BlockRequest struct { PeerID types.NodeID } +// request the header of a block at a certain height. Used to cross check +// the validated blocks with witnesses +type HeaderRequest struct { + Height int64 + PeerID types.NodeID +} + // BlockPool keeps track of the block sync peers, block requests and block responses. type BlockPool struct { service.BaseService @@ -76,6 +83,9 @@ type BlockPool struct { mtx sync.RWMutex // block requests requesters map[int64]*bpRequester + // witness requesters + //TODO we ideally want more than one witness per height + witnessRequesters map[int64]*witnessRequester height int64 // the lowest key in requesters. // peers @@ -86,7 +96,10 @@ type BlockPool struct { numPending int32 // number of requests pending assignment or block response requestsCh chan<- BlockRequest - errorsCh chan<- peerError + //ToDO We essentially request a header here but reusing the same message + // type for now + witnessRequestsCh chan<- HeaderRequest + errorsCh chan<- peerError startHeight int64 lastHundredBlockTimeStamp time.Time @@ -376,12 +389,12 @@ func (pool *BlockPool) removePeer(peerID types.NodeID) { } } - // for _, requester := range pool.verificationRequesters { - // if requester.getPeerID() == peerID { - // requester.redo(peerID) - // } - // } + for _, requester := range pool.witnessRequesters { + if requester.getPeerID() == peerID { + requester.redo(peerID) + } + } peer, ok := pool.peers[peerID] if ok { if peer.timeout != nil { @@ -409,6 +422,28 @@ func (pool *BlockPool) updateMaxPeerHeight() { pool.maxPeerHeight = max } +func (pool *BlockPool) pickIncrAvailableWitness(height int64) *bpPeer { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + for _, peer := range pool.peers { + if peer.didTimeout { + pool.removePeer(peer.id) + continue + } + if peer.numPending >= maxPendingRequestsPerPeer { + continue + } + if height < peer.base || height > peer.height || peer.id == pool.requesters[height].peerID { + continue + } + peer.incrPending() + + return peer + } + return nil +} + // Pick an available peer with the given height available. // If no peers are available, returns nil. func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { @@ -443,7 +478,8 @@ func (pool *BlockPool) makeNextRequester(ctx context.Context) { } request := newBPRequester(pool.logger, pool, nextHeight) - + witnessRequester := newWitnessRequester(pool.logger, pool, nextHeight) + witnessRequester.excludePeerID = request.peerID pool.requesters[nextHeight] = request atomic.AddInt32(&pool.numPending, 1) @@ -452,6 +488,10 @@ func (pool *BlockPool) makeNextRequester(ctx context.Context) { if err != nil { request.logger.Error("error starting request", "err", err) } + err = witnessRequester.Start(ctx) + if err != nil { + witnessRequester.logger.Error("error starting witness request", "err", err) + } } func (pool *BlockPool) requestersLen() int64 { @@ -465,6 +505,13 @@ func (pool *BlockPool) sendRequest(height int64, peerID types.NodeID) { pool.requestsCh <- BlockRequest{height, peerID} } +func (pool *BlockPool) sendWitnessRequest(height int64, peerID types.NodeID) { + if !pool.IsRunning() { + return + } + pool.witnessRequestsCh <- HeaderRequest{height, peerID} +} + func (pool *BlockPool) sendError(err error, peerID types.NodeID) { if !pool.IsRunning() { return @@ -571,6 +618,108 @@ type BlockResponse struct { commit *types.Commit } +//------------------------------------- + +type witnessRequester struct { + service.BaseService + peerID types.NodeID + header *types.Header + height int64 + getHeaderCh chan struct{} + redoCh chan types.NodeID + mtx sync.Mutex + // ID of peer we have already received this block from + excludePeerID types.NodeID + pool *BlockPool + logger log.Logger +} + +func newWitnessRequester(logger log.Logger, pool *BlockPool, height int64) *witnessRequester { + wreq := &witnessRequester{ + logger: pool.logger, + pool: pool, + height: height, + getHeaderCh: make(chan struct{}, 1), + redoCh: make(chan types.NodeID), + peerID: "", + header: nil, + } + wreq.BaseService = *service.NewBaseService(logger, "witnessReqester", wreq) + return wreq +} + +func (wreq *witnessRequester) OnStart(ctx context.Context) error { + go wreq.requestRoutine(ctx) + return nil +} +func (*witnessRequester) OnStop() {} + +func (wreq *witnessRequester) getPeerID() types.NodeID { + wreq.mtx.Lock() + defer wreq.mtx.Unlock() + return wreq.peerID +} + +func (wreq *witnessRequester) requestRoutine(ctx context.Context) { +OUTER_LOOP: + for { + // Pick a peer to send request to. + var peer *bpPeer + PICK_PEER_LOOP: + for { + if !wreq.IsRunning() || !wreq.pool.IsRunning() { + return + } + peer = wreq.pool.pickIncrAvailableWitness(wreq.height) + if peer == nil { + time.Sleep(requestIntervalMS * time.Millisecond) + continue PICK_PEER_LOOP + } + break PICK_PEER_LOOP + } + wreq.mtx.Lock() + wreq.peerID = peer.id + wreq.mtx.Unlock() + + // Send request and wait. + wreq.pool.sendWitnessRequest(wreq.height, peer.id) + WAIT_LOOP: + for { + select { + case <-ctx.Done(): + return + case peerID := <-wreq.redoCh: + if peerID == wreq.peerID { + wreq.reset() + continue OUTER_LOOP + } else { + continue WAIT_LOOP + } + case <-wreq.getHeaderCh: + // We got a block! + // Continue the for-loop and wait til Quit. + continue WAIT_LOOP + } + } + } +} + +func (wreq *witnessRequester) redo(peerID types.NodeID) { + select { + case wreq.redoCh <- peerID: + default: + } +} + +func (wreq *witnessRequester) reset() { + wreq.mtx.Lock() + defer wreq.mtx.Unlock() + wreq.peerID = "" + wreq.header = nil +} + +//------------------------------------- + type bpRequester struct { service.BaseService logger log.Logger @@ -581,8 +730,7 @@ type bpRequester struct { mtx sync.Mutex peerID types.NodeID - block *types.Block //*types.Block - + block *types.Block } func newBPRequester(logger log.Logger, pool *BlockPool, height int64) *bpRequester {