blocksync: witnesses for verification, not fully working yet

This commit is contained in:
Jasmina Malicevic
2022-04-20 15:23:50 +02:00
parent e227b9ea07
commit 38d36b59ec
5 changed files with 634 additions and 45 deletions

View File

@@ -113,19 +113,22 @@ func NewBlockPool(
start int64,
requestsCh chan<- BlockRequest,
errorsCh chan<- peerError,
witnessRequestCh chan<- HeaderRequest,
) *BlockPool {
bp := &BlockPool{
logger: logger,
peers: make(map[types.NodeID]*bpPeer),
requesters: make(map[int64]*bpRequester),
logger: logger,
peers: make(map[types.NodeID]*bpPeer),
requesters: make(map[int64]*bpRequester),
witnessRequesters: make(map[int64]*witnessRequester),
// verificationRequesters: make(map[int64]*bpRequester),
height: start,
startHeight: start,
numPending: 0,
requestsCh: requestsCh,
errorsCh: errorsCh,
lastSyncRate: 0,
height: start,
startHeight: start,
numPending: 0,
requestsCh: requestsCh,
errorsCh: errorsCh,
witnessRequestsCh: witnessRequestCh,
lastSyncRate: 0,
}
bp.BaseService = *service.NewBaseService(logger, "BlockPool", bp)
return bp
@@ -295,6 +298,10 @@ func (pool *BlockPool) RedoRequest(height int64) types.NodeID {
return peerID
}
func (pool *BlockPool) AddWitnessHeader(header *types.Header) {
pool.witnessRequesters[header.Height].header = header
}
// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, blockSize int) {
@@ -481,7 +488,7 @@ func (pool *BlockPool) makeNextRequester(ctx context.Context) {
witnessRequester := newWitnessRequester(pool.logger, pool, nextHeight)
witnessRequester.excludePeerID = request.peerID
pool.requesters[nextHeight] = request
pool.witnessRequesters[nextHeight] = witnessRequester
atomic.AddInt32(&pool.numPending, 1)
err := request.Start(ctx)

View File

@@ -86,7 +86,8 @@ func TestBlockPoolBasic(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh)
witnessRequestCh := make(chan HeaderRequest, 1000)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh, witnessRequestCh)
if err := pool.Start(ctx); err != nil {
t.Error(err)
@@ -144,7 +145,8 @@ func TestBlockPoolTimeout(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(logger, start, requestsCh, errorsCh)
witnessRequestCh := make(chan HeaderRequest, 1000)
pool := NewBlockPool(logger, start, requestsCh, errorsCh, witnessRequestCh)
err := pool.Start(ctx)
if err != nil {
t.Error(err)
@@ -206,8 +208,8 @@ func TestBlockPoolRemovePeer(t *testing.T) {
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh)
witnessRequestCh := make(chan HeaderRequest, 1000)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, witnessRequestCh)
err := pool.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); pool.Wait() })

View File

@@ -87,6 +87,7 @@ type Reactor struct {
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
witnessCh <-chan HeaderRequest
metrics *consensus.Metrics
eventBus *eventbus.EventBus
@@ -158,9 +159,11 @@ func (r *Reactor) OnStart(ctx context.Context) error {
requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh)
witnessRequestCh := make(chan HeaderRequest, maxTotalRequesters)
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh, witnessRequestCh)
r.requestsCh = requestsCh
r.errorsCh = errorsCh
r.witnessCh = witnessRequestCh
if r.blockSync.IsSet() {
if err := r.pool.Start(ctx); err != nil {
@@ -185,6 +188,27 @@ func (r *Reactor) OnStop() {
}
}
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
// Otherwise, we'll respond saying we do not have it.
func (r *Reactor) sendHeaderToPeer(ctx context.Context, msg *bcproto.HeaderRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
block := r.store.LoadBlockProto(msg.Height)
if block != nil {
header := &block.Header
return blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.HeaderResponse{Header: header},
})
}
r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
return blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
}
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
// Otherwise, we'll respond saying we do not have it.
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
@@ -227,6 +251,16 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
switch chID {
case BlockSyncChannel:
switch msg := envelope.Message.(type) {
case *bcproto.HeaderRequest:
return r.sendHeaderToPeer(ctx, msg, envelope.From, blockSyncCh)
case *bcproto.HeaderResponse:
header, err := types.HeaderFromProto(msg.Header)
if err != nil {
r.logger.Error("faled to convert header from proto", "err", err)
return err
}
r.pool.AddWitnessHeader(&header)
case *bcproto.BlockRequest:
return r.respondToPeer(ctx, msg, envelope.From, blockSyncCh)
case *bcproto.BlockResponse:
@@ -375,6 +409,18 @@ func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel)
select {
case <-ctx.Done():
return
case wreq := <-r.witnessCh:
if err := blockSyncCh.Send(ctx, p2p.Envelope{
To: wreq.PeerID,
Message: &bcproto.HeaderRequest{Height: wreq.Height},
}); err != nil {
if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: wreq.PeerID,
Err: err,
}); err != nil {
return
}
}
case request := <-r.requestsCh:
if err := blockSyncCh.Send(ctx, p2p.Envelope{
To: request.PeerID,
@@ -404,6 +450,17 @@ func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel)
}
}
}
func (r *Reactor) verifyWithWitnesses(newBlock *types.Block) error {
if r.pool.witnessRequesters[newBlock.Height] != nil {
witnessHeader := r.pool.witnessRequesters[newBlock.Height].header
if !bytes.Equal(witnessHeader.Hash(), newBlock.Hash()) {
r.logger.Error("hashes does not match with witness header")
return errors.New("header not matching the header provided by the witness")
}
}
return nil
}
// poolRoutine handles messages from the poolReactor telling the reactor what to
// do.
@@ -556,10 +613,18 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
)
return
}
r.lastTrustedBlock = &BlockResponse{block: newBlock, commit: verifyBlock.LastCommit}
}
r.lastTrustedBlock.block = newBlock
r.lastTrustedBlock.commit = verifyBlock.LastCommit
if err := r.verifyWithWitnesses(newBlock); err != nil {
return
}
if r.lastTrustedBlock == nil {
r.lastTrustedBlock = &BlockResponse{block: newBlock, commit: verifyBlock.LastCommit}
} else {
r.lastTrustedBlock.block = newBlock
r.lastTrustedBlock.commit = verifyBlock.LastCommit
}
r.pool.PopRequest()
// TODO: batch saves so we do not persist to disk every block