blocksync/pool: initial changes for witness support

This commit is contained in:
Jasmina Malicevic
2022-04-19 15:58:39 +02:00
parent 1c86cec072
commit e227b9ea07

View File

@@ -66,6 +66,13 @@ type BlockRequest struct {
PeerID types.NodeID
}
// request the header of a block at a certain height. Used to cross check
// the validated blocks with witnesses
type HeaderRequest struct {
Height int64
PeerID types.NodeID
}
// BlockPool keeps track of the block sync peers, block requests and block responses.
type BlockPool struct {
service.BaseService
@@ -76,6 +83,9 @@ type BlockPool struct {
mtx sync.RWMutex
// block requests
requesters map[int64]*bpRequester
// witness requesters
//TODO we ideally want more than one witness per height
witnessRequesters map[int64]*witnessRequester
height int64 // the lowest key in requesters.
// peers
@@ -86,7 +96,10 @@ type BlockPool struct {
numPending int32 // number of requests pending assignment or block response
requestsCh chan<- BlockRequest
errorsCh chan<- peerError
//ToDO We essentially request a header here but reusing the same message
// type for now
witnessRequestsCh chan<- HeaderRequest
errorsCh chan<- peerError
startHeight int64
lastHundredBlockTimeStamp time.Time
@@ -376,12 +389,12 @@ func (pool *BlockPool) removePeer(peerID types.NodeID) {
}
}
// for _, requester := range pool.verificationRequesters {
// if requester.getPeerID() == peerID {
// requester.redo(peerID)
// }
// }
for _, requester := range pool.witnessRequesters {
if requester.getPeerID() == peerID {
requester.redo(peerID)
}
}
peer, ok := pool.peers[peerID]
if ok {
if peer.timeout != nil {
@@ -409,6 +422,28 @@ func (pool *BlockPool) updateMaxPeerHeight() {
pool.maxPeerHeight = max
}
func (pool *BlockPool) pickIncrAvailableWitness(height int64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()
for _, peer := range pool.peers {
if peer.didTimeout {
pool.removePeer(peer.id)
continue
}
if peer.numPending >= maxPendingRequestsPerPeer {
continue
}
if height < peer.base || height > peer.height || peer.id == pool.requesters[height].peerID {
continue
}
peer.incrPending()
return peer
}
return nil
}
// Pick an available peer with the given height available.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
@@ -443,7 +478,8 @@ func (pool *BlockPool) makeNextRequester(ctx context.Context) {
}
request := newBPRequester(pool.logger, pool, nextHeight)
witnessRequester := newWitnessRequester(pool.logger, pool, nextHeight)
witnessRequester.excludePeerID = request.peerID
pool.requesters[nextHeight] = request
atomic.AddInt32(&pool.numPending, 1)
@@ -452,6 +488,10 @@ func (pool *BlockPool) makeNextRequester(ctx context.Context) {
if err != nil {
request.logger.Error("error starting request", "err", err)
}
err = witnessRequester.Start(ctx)
if err != nil {
witnessRequester.logger.Error("error starting witness request", "err", err)
}
}
func (pool *BlockPool) requestersLen() int64 {
@@ -465,6 +505,13 @@ func (pool *BlockPool) sendRequest(height int64, peerID types.NodeID) {
pool.requestsCh <- BlockRequest{height, peerID}
}
func (pool *BlockPool) sendWitnessRequest(height int64, peerID types.NodeID) {
if !pool.IsRunning() {
return
}
pool.witnessRequestsCh <- HeaderRequest{height, peerID}
}
func (pool *BlockPool) sendError(err error, peerID types.NodeID) {
if !pool.IsRunning() {
return
@@ -571,6 +618,108 @@ type BlockResponse struct {
commit *types.Commit
}
//-------------------------------------
type witnessRequester struct {
service.BaseService
peerID types.NodeID
header *types.Header
height int64
getHeaderCh chan struct{}
redoCh chan types.NodeID
mtx sync.Mutex
// ID of peer we have already received this block from
excludePeerID types.NodeID
pool *BlockPool
logger log.Logger
}
func newWitnessRequester(logger log.Logger, pool *BlockPool, height int64) *witnessRequester {
wreq := &witnessRequester{
logger: pool.logger,
pool: pool,
height: height,
getHeaderCh: make(chan struct{}, 1),
redoCh: make(chan types.NodeID),
peerID: "",
header: nil,
}
wreq.BaseService = *service.NewBaseService(logger, "witnessReqester", wreq)
return wreq
}
func (wreq *witnessRequester) OnStart(ctx context.Context) error {
go wreq.requestRoutine(ctx)
return nil
}
func (*witnessRequester) OnStop() {}
func (wreq *witnessRequester) getPeerID() types.NodeID {
wreq.mtx.Lock()
defer wreq.mtx.Unlock()
return wreq.peerID
}
func (wreq *witnessRequester) requestRoutine(ctx context.Context) {
OUTER_LOOP:
for {
// Pick a peer to send request to.
var peer *bpPeer
PICK_PEER_LOOP:
for {
if !wreq.IsRunning() || !wreq.pool.IsRunning() {
return
}
peer = wreq.pool.pickIncrAvailableWitness(wreq.height)
if peer == nil {
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_PEER_LOOP
}
break PICK_PEER_LOOP
}
wreq.mtx.Lock()
wreq.peerID = peer.id
wreq.mtx.Unlock()
// Send request and wait.
wreq.pool.sendWitnessRequest(wreq.height, peer.id)
WAIT_LOOP:
for {
select {
case <-ctx.Done():
return
case peerID := <-wreq.redoCh:
if peerID == wreq.peerID {
wreq.reset()
continue OUTER_LOOP
} else {
continue WAIT_LOOP
}
case <-wreq.getHeaderCh:
// We got a block!
// Continue the for-loop and wait til Quit.
continue WAIT_LOOP
}
}
}
}
func (wreq *witnessRequester) redo(peerID types.NodeID) {
select {
case wreq.redoCh <- peerID:
default:
}
}
func (wreq *witnessRequester) reset() {
wreq.mtx.Lock()
defer wreq.mtx.Unlock()
wreq.peerID = ""
wreq.header = nil
}
//-------------------------------------
type bpRequester struct {
service.BaseService
logger log.Logger
@@ -581,8 +730,7 @@ type bpRequester struct {
mtx sync.Mutex
peerID types.NodeID
block *types.Block //*types.Block
block *types.Block
}
func newBPRequester(logger log.Logger, pool *BlockPool, height int64) *bpRequester {