blocksync: Initial working commit with adjacent block verification

store: Added functions ot return protobuf objects for block and commit (perf opt)
proto: Added Commit to the BlockResponse message

- WARN: Manual proto files for blocksync
This commit is contained in:
Jasmina Malicevic
2022-04-07 14:25:09 +02:00
parent cb8e6b1c1a
commit 09f522b249
5 changed files with 259 additions and 108 deletions

View File

@@ -76,7 +76,10 @@ type BlockPool struct {
mtx sync.RWMutex
// block requests
requesters map[int64]*bpRequester
height int64 // the lowest key in requesters.
// block requests to verify blocks against peers
// verificationRequesters map[int64]*bpRequester
height int64 // the lowest key in requesters.
// peers
peers map[types.NodeID]*bpPeer
maxPeerHeight int64 // the biggest reported height
@@ -102,9 +105,10 @@ func NewBlockPool(
) *BlockPool {
bp := &BlockPool{
logger: logger,
peers: make(map[types.NodeID]*bpPeer),
requesters: make(map[int64]*bpRequester),
logger: logger,
peers: make(map[types.NodeID]*bpPeer),
requesters: make(map[int64]*bpRequester),
// verificationRequesters: make(map[int64]*bpRequester),
height: start,
startHeight: start,
numPending: 0,
@@ -204,6 +208,16 @@ func (pool *BlockPool) IsCaughtUp() bool {
return pool.height >= (pool.maxPeerHeight - 1)
}
func (pool *BlockPool) PeekBlock() (first *BlockResponse) {
pool.mtx.RLock()
defer pool.mtx.RUnlock()
if r := pool.requesters[pool.height]; r != nil {
first = r.getBlock()
}
return
}
// PeekTwoBlocks returns blocks at pool.height and pool.height+1.
// We need to see the second block's Commit to validate the first block.
// So we peek two blocks at a time.
@@ -213,16 +227,15 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
defer pool.mtx.RUnlock()
if r := pool.requesters[pool.height]; r != nil {
first = r.getBlock()
first = r.getBlock().block
}
if r := pool.requesters[pool.height+1]; r != nil {
second = r.getBlock()
second = r.getBlock().block
}
return
}
// PopRequest pops the first block at pool.height.
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
func (pool *BlockPool) PopRequest() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
@@ -248,6 +261,10 @@ func (pool *BlockPool) PopRequest() {
} else {
panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height))
}
// if r := pool.verificationRequesters[pool.height]; r != nil {
// r.Stop()
// delete(pool.verificationRequesters, pool.height)
// }
}
// RedoRequest invalidates the block at pool.height,
@@ -268,15 +285,15 @@ func (pool *BlockPool) RedoRequest(height int64) types.NodeID {
// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, blockSize int) {
func (pool *BlockPool) AddBlock(peerID types.NodeID, blockResponse *BlockResponse, blockSize int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
requester := pool.requesters[block.Height]
requester := pool.requesters[blockResponse.block.Height]
if requester == nil {
pool.logger.Error("peer sent us a block we didn't expect",
"peer", peerID, "curHeight", pool.height, "blockHeight", block.Height)
diff := pool.height - block.Height
"peer", peerID, "curHeight", pool.height, "blockHeight", blockResponse.block.Height)
diff := pool.height - blockResponse.block.Height
if diff < 0 {
diff *= -1
}
@@ -286,16 +303,18 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, blockSi
return
}
if requester.setBlock(block, peerID) {
if requester.setBlock(blockResponse, peerID) {
atomic.AddInt32(&pool.numPending, -1)
peer := pool.peers[peerID]
if peer != nil {
peer.decrPending(blockSize)
}
} else {
err := errors.New("requester is different or block already exists")
pool.logger.Error(err.Error(), "peer", peerID, "requester", requester.getPeerID(), "blockHeight", block.Height)
pool.logger.Error(err.Error(), "peer", peerID, "requester", requester.getPeerID(), "blockHeight", blockResponse.block.Height)
pool.sendError(err, peerID)
}
}
@@ -356,7 +375,13 @@ func (pool *BlockPool) removePeer(peerID types.NodeID) {
if requester.getPeerID() == peerID {
requester.redo(peerID)
}
}
// for _, requester := range pool.verificationRequesters {
// if requester.getPeerID() == peerID {
// requester.redo(peerID)
// }
// }
peer, ok := pool.peers[peerID]
if ok {
@@ -403,6 +428,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
continue
}
peer.incrPending()
return peer
}
return nil
@@ -420,6 +446,7 @@ func (pool *BlockPool) makeNextRequester(ctx context.Context) {
request := newBPRequester(pool.logger, pool, nextHeight)
pool.requesters[nextHeight] = request
atomic.AddInt32(&pool.numPending, 1)
err := request.Start(ctx)
@@ -540,6 +567,11 @@ func (peer *bpPeer) onTimeout() {
//-------------------------------------
type BlockResponse struct {
block *types.Block
commit *types.Commit
}
type bpRequester struct {
service.BaseService
logger log.Logger
@@ -550,7 +582,8 @@ type bpRequester struct {
mtx sync.Mutex
peerID types.NodeID
block *types.Block
block *BlockResponse //*types.Block
}
func newBPRequester(logger log.Logger, pool *BlockPool, height int64) *bpRequester {
@@ -576,7 +609,7 @@ func (bpr *bpRequester) OnStart(ctx context.Context) error {
func (*bpRequester) OnStop() {}
// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, peerID types.NodeID) bool {
func (bpr *bpRequester) setBlock(block *BlockResponse, peerID types.NodeID) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
@@ -592,7 +625,7 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID types.NodeID) bool {
return true
}
func (bpr *bpRequester) getBlock() *types.Block {
func (bpr *bpRequester) getBlock() *BlockResponse {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.block

View File

@@ -43,7 +43,7 @@ func (p testPeer) runInputRoutine() {
// Request desired, pretend like we got the block immediately.
func (p testPeer) simulateInput(input inputData) {
block := &types.Block{Header: types.Header{Height: input.request.Height}}
input.pool.AddBlock(input.request.PeerID, block, 123)
input.pool.AddBlock(input.request.PeerID, &BlockResponse{block, &types.Commit{}}, 123)
// TODO: uncommenting this creates a race which is detected by:
// https://github.com/golang/go/blob/2bd767b1022dd3254bcec469f0ee164024726486/src/testing/testing.go#L854-L856
// see: https://github.com/tendermint/tendermint/issues/3390#issue-418379890
@@ -110,8 +110,8 @@ func TestBlockPoolBasic(t *testing.T) {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
if first != nil && second != nil {
first := pool.PeekBlock()
if first != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)
@@ -164,8 +164,8 @@ func TestBlockPoolTimeout(t *testing.T) {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
if first != nil && second != nil {
first := pool.PeekBlock()
if first != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)

View File

@@ -1,6 +1,7 @@
package blocksync
import (
"bytes"
"context"
"errors"
"fmt"
@@ -15,6 +16,7 @@ import (
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/light"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync"
"github.com/tendermint/tendermint/types"
)
@@ -63,6 +65,55 @@ func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}
func (r *Reactor) VerifyAdjacent(
trustedHeader *types.SignedHeader, // height=X
untrustedHeader *types.SignedHeader, // height=X+1
untrustedVals *types.ValidatorSet, // height=X+1)
) error {
if len(trustedHeader.NextValidatorsHash) == 0 {
return errors.New("next validators hash in trusted header is empty")
}
if untrustedHeader.Height != trustedHeader.Height+1 {
return errors.New("headers must be adjacent in height")
}
if err := untrustedHeader.ValidateBasic(trustedHeader.ChainID); err != nil {
return fmt.Errorf("untrustedHeader.ValidateBasic failed: %w", err)
}
if untrustedHeader.Height <= trustedHeader.Height {
return fmt.Errorf("expected new header height %d to be greater than one of old header %d",
untrustedHeader.Height,
trustedHeader.Height)
}
if !untrustedHeader.Time.After(trustedHeader.Time) {
return fmt.Errorf("expected new header time %v to be after old header time %v",
untrustedHeader.Time,
trustedHeader.Time)
}
if !bytes.Equal(untrustedHeader.ValidatorsHash, untrustedVals.Hash()) {
return fmt.Errorf("expected new header validators (%X) to match those that were supplied (%X) at height %d",
untrustedHeader.ValidatorsHash,
untrustedVals.Hash(),
untrustedHeader.Height,
)
}
// Check the validator hashes are the same
if !bytes.Equal(untrustedHeader.ValidatorsHash, trustedHeader.NextValidatorsHash) {
err := fmt.Errorf("expected old header's next validators (%X) to match those from new header (%X)",
trustedHeader.NextValidatorsHash,
untrustedHeader.ValidatorsHash,
)
return light.ErrInvalidHeader{Reason: err}
}
return nil
}
// Reactor handles long-term catchup syncing.
type Reactor struct {
service.BaseService
@@ -89,6 +140,8 @@ type Reactor struct {
eventBus *eventbus.EventBus
syncStartTime time.Time
lastTrustedBlock *BlockResponse
}
// NewReactor returns new reactor instance.
@@ -111,16 +164,17 @@ func NewReactor(
}
r := &Reactor{
logger: logger,
stateStore: stateStore,
blockExec: blockExec,
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
blockSyncCh: blockSyncCh,
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
logger: logger,
stateStore: stateStore,
blockExec: blockExec,
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
blockSyncCh: blockSyncCh,
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
lastTrustedBlock: nil,
}
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
@@ -182,18 +236,15 @@ func (r *Reactor) OnStop() {
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
// Otherwise, we'll respond saying we do not have it.
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID) error {
block := r.store.LoadBlock(msg.Height)
block := r.store.LoadBlockProto(msg.Height)
if block != nil {
blockProto, err := block.ToProto()
if err != nil {
r.logger.Error("failed to convert msg to protobuf", "err", err)
return err
blockCommit := r.store.LoadBlockCommitProto(msg.Height)
if blockCommit != nil {
return r.blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.BlockResponse{Block: block, Commit: blockCommit},
})
}
return r.blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.BlockResponse{Block: blockProto},
})
}
r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
@@ -219,8 +270,13 @@ func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope *p2p.Enve
logger.Error("failed to convert block from proto", "err", err)
return err
}
commit, err := types.CommitFromProto(msg.Commit)
if err != nil {
logger.Error("failed to convert commit from proto", "err", err)
return err
}
r.pool.AddBlock(envelope.From, block, block.Size())
r.pool.AddBlock(envelope.From, &BlockResponse{block, commit}, block.Size())
case *bcproto.StatusRequest:
return r.blockSyncCh.Send(ctx, p2p.Envelope{
@@ -410,9 +466,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
blocksSynced = uint64(0)
chainID = r.initialState.ChainID
state = r.initialState
state = r.initialState
lastHundred = time.Now()
lastRate = 0.0
@@ -482,94 +536,109 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
//
// TODO: Uncouple from request routine.
// see if there are any blocks to sync
first, second := r.pool.PeekTwoBlocks()
if first == nil || second == nil {
// we need both to sync the first block
newBlock := r.pool.PeekBlock()
if newBlock == nil {
continue
} else {
// try again quickly next loop
didProcessCh <- struct{}{}
}
firstParts, err := first.MakePartSet(types.BlockPartSizeBytes)
if err != nil {
if r.lastTrustedBlock == nil && r.initialState.LastBlockHeight != 0 {
r.lastTrustedBlock = &BlockResponse{r.store.LoadBlock(r.initialState.LastBlockHeight), r.store.LoadBlockCommit(r.initialState.LastBlockHeight)}
if r.lastTrustedBlock == nil {
panic("Failed to load last trusted block")
}
}
newBlockParts, err2 := newBlock.block.MakePartSet(types.BlockPartSizeBytes)
if err2 != nil {
r.logger.Error("failed to make ",
"height", first.Height,
"err", err.Error())
"height", newBlock.block.Height,
"err", err2.Error())
return
}
var (
firstPartSetHeader = firstParts.Header()
firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
newBlockPartSetHeader = newBlockParts.Header()
newBlockID = types.BlockID{Hash: newBlock.block.Hash(), PartSetHeader: newBlockPartSetHeader}
)
// Finally, verify the first block using the second's commit.
//
// NOTE: We can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
if err = state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit); err != nil {
err = fmt.Errorf("invalid last commit: %w", err)
r.logger.Error(
err.Error(),
"last_commit", second.LastCommit,
"block_id", firstID,
"height", first.Height,
)
// NOTE: We've already removed the peer's request, but we still need
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID,
Err: err,
}); serr != nil {
return
if r.lastTrustedBlock != nil && r.lastTrustedBlock.block != nil {
if newBlock.block.Height != r.lastTrustedBlock.block.Height+1 {
panic("Need last block")
}
peerID2 := r.pool.RedoRequest(second.Height)
if peerID2 != peerID {
// ToDo @jmalicevic
// newBlock.last commit - validates r.lastTrustedBlock
// if fail - peer dismissed
// r.lastTrustedBlock.header. nextValidator = newBlock. validator
// validators. validate (newBlock)
err := r.VerifyAdjacent(&types.SignedHeader{Header: &r.lastTrustedBlock.block.Header, Commit: r.lastTrustedBlock.commit}, &types.SignedHeader{Header: &newBlock.block.Header, Commit: newBlock.commit}, state.NextValidators)
if err != nil {
err = fmt.Errorf("invalid last commit: %w", err)
r.logger.Error(
err.Error(),
"last_commit", newBlock.block.LastCommit,
"block_id", newBlockID,
"height", r.lastTrustedBlock.block.Height,
)
peerID := r.pool.RedoRequest(r.lastTrustedBlock.block.Height + 1)
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID2,
NodeID: peerID,
Err: err,
}); serr != nil {
return
}
}
} else {
r.pool.PopRequest()
// chainID := r.initialState.ChainID
oldHash := r.initialState.Validators.Hash()
if !bytes.Equal(oldHash, newBlock.block.ValidatorsHash) {
// TODO: batch saves so we do not persist to disk every block
r.store.SaveBlock(first, firstParts, second.LastCommit)
fmt.Println(
var err error
// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.
state, err = r.blockExec.ApplyBlock(ctx, state, firstID, first)
if err != nil {
// TODO: This is bad, are we zombie?
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
r.metrics.RecordConsMetrics(first)
blocksSynced++
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
r.logger.Info(
"block sync rate",
"height", r.pool.height,
"max_peer_height", r.pool.MaxPeerHeight(),
"blocks/s", lastRate,
"initial hash ", r.initialState.Validators.Hash(),
"new hash ", newBlock.block.ValidatorsHash,
)
lastHundred = time.Now()
return
}
}
r.lastTrustedBlock = newBlock
r.pool.PopRequest()
// TODO: batch saves so we do not persist to disk every block
r.store.SaveBlock(newBlock.block, newBlockParts, newBlock.commit)
var err error
// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.
state, err = r.blockExec.ApplyBlock(ctx, state, newBlockID, newBlock.block)
if err != nil {
// TODO: This is bad, are we zombie?
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", newBlock.block.Height, newBlock.block.Hash(), err))
}
r.metrics.RecordConsMetrics(newBlock.block)
blocksSynced++
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
r.logger.Info(
"block sync rate",
"height", r.pool.height,
"max_peer_height", r.pool.MaxPeerHeight(),
"blocks/s", lastRate,
)
lastHundred = time.Now()
}
}
}
}

View File

@@ -126,6 +126,34 @@ func (bs *BlockStore) LoadBaseMeta() *types.BlockMeta {
return nil
}
// LoadBlock returns the block with the given height in protobuf.
// If no block is found for that height, it returns nil.
func (bs *BlockStore) LoadBlockProto(height int64) *tmproto.Block {
var blockMeta = bs.LoadBlockMeta(height)
if blockMeta == nil {
return nil
}
pbb := new(tmproto.Block)
buf := []byte{}
for i := 0; i < int(blockMeta.BlockID.PartSetHeader.Total); i++ {
part := bs.LoadBlockPart(height, i)
// If the part is missing (e.g. since it has been deleted after we
// loaded the block meta) we consider the whole block to be missing.
if part == nil {
return nil
}
buf = append(buf, part.Bytes...)
}
err := proto.Unmarshal(buf, pbb)
if err != nil {
// NOTE: The existence of meta should imply the existence of the
// block. So, make sure meta is only saved after blocks are saved.
panic(fmt.Errorf("error reading block: %w", err))
}
return pbb
}
// LoadBlock returns the block with the given height.
// If no block is found for that height, it returns nil.
func (bs *BlockStore) LoadBlock(height int64) *types.Block {
@@ -254,6 +282,26 @@ func (bs *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
return blockMeta
}
// LoadBlockCommit returns the Commit for the given height in protobuf.
// This commit consists of the +2/3 and other Precommit-votes for block at `height`,
// and it comes from the block.LastCommit for `height+1`.
// If no commit is found for the given height, it returns nil.
func (bs *BlockStore) LoadBlockCommitProto(height int64) *tmproto.Commit {
var pbc = new(tmproto.Commit)
bz, err := bs.db.Get(blockCommitKey(height))
if err != nil {
panic(err)
}
if len(bz) == 0 {
return nil
}
err = proto.Unmarshal(bz, pbc)
if err != nil {
panic(fmt.Errorf("error reading block commit: %w", err))
}
return pbc
}
// LoadBlockCommit returns the Commit for the given height.
// This commit consists of the +2/3 and other Precommit-votes for block at `height`,
// and it comes from the block.LastCommit for `height+1`.

View File

@@ -4,7 +4,7 @@ package tendermint.blocksync;
option go_package = "github.com/tendermint/tendermint/proto/tendermint/blocksync";
import "tendermint/types/block.proto";
import "tendermint/types/types.proto";
// BlockRequest requests a block for a specific height
message BlockRequest {
int64 height = 1;
@@ -19,6 +19,7 @@ message NoBlockResponse {
// BlockResponse returns block to the requested
message BlockResponse {
tendermint.types.Block block = 1;
tendermint.types.Commit commit = 2;
}
// StatusRequest requests the status of a peer.