mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-06 12:00:44 +00:00
blocksync: Verification using LastCommit of block at height H + 2
This commit is contained in:
@@ -208,7 +208,7 @@ func (pool *BlockPool) IsCaughtUp() bool {
|
||||
return pool.height >= (pool.maxPeerHeight - 1)
|
||||
}
|
||||
|
||||
func (pool *BlockPool) PeekBlock() (first *BlockResponse) {
|
||||
func (pool *BlockPool) PeekBlock() (first *types.Block) {
|
||||
pool.mtx.RLock()
|
||||
defer pool.mtx.RUnlock()
|
||||
|
||||
@@ -227,10 +227,10 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
|
||||
defer pool.mtx.RUnlock()
|
||||
|
||||
if r := pool.requesters[pool.height]; r != nil {
|
||||
first = r.getBlock().block
|
||||
first = r.getBlock()
|
||||
}
|
||||
if r := pool.requesters[pool.height+1]; r != nil {
|
||||
second = r.getBlock().block
|
||||
second = r.getBlock()
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -245,10 +245,11 @@ func (pool *BlockPool) PopRequest() {
|
||||
delete(pool.requesters, pool.height)
|
||||
pool.height++
|
||||
pool.lastAdvance = time.Now()
|
||||
|
||||
// the lastSyncRate will be updated every 100 blocks, it uses the adaptive filter
|
||||
// to smooth the block sync rate and the unit represents the number of blocks per second.
|
||||
if (pool.height-pool.startHeight)%100 == 0 {
|
||||
// -1 because the start height is assumed to be 1 @jmalicevic ToDo, verify it is still OK when
|
||||
// starting height is not 1
|
||||
if (pool.height-pool.startHeight-1)%100 == 0 {
|
||||
newSyncRate := 100 / time.Since(pool.lastHundredBlockTimeStamp).Seconds()
|
||||
if pool.lastSyncRate == 0 {
|
||||
pool.lastSyncRate = newSyncRate
|
||||
@@ -285,15 +286,15 @@ func (pool *BlockPool) RedoRequest(height int64) types.NodeID {
|
||||
|
||||
// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
|
||||
// TODO: ensure that blocks come in order for each peer.
|
||||
func (pool *BlockPool) AddBlock(peerID types.NodeID, blockResponse *BlockResponse, blockSize int) {
|
||||
func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, blockSize int) {
|
||||
pool.mtx.Lock()
|
||||
defer pool.mtx.Unlock()
|
||||
|
||||
requester := pool.requesters[blockResponse.block.Height]
|
||||
requester := pool.requesters[block.Height]
|
||||
if requester == nil {
|
||||
pool.logger.Error("peer sent us a block we didn't expect",
|
||||
"peer", peerID, "curHeight", pool.height, "blockHeight", blockResponse.block.Height)
|
||||
diff := pool.height - blockResponse.block.Height
|
||||
"peer", peerID, "curHeight", pool.height, "blockHeight", block.Height)
|
||||
diff := pool.height - block.Height
|
||||
if diff < 0 {
|
||||
diff *= -1
|
||||
}
|
||||
@@ -303,7 +304,7 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, blockResponse *BlockRespons
|
||||
return
|
||||
}
|
||||
|
||||
if requester.setBlock(blockResponse, peerID) {
|
||||
if requester.setBlock(block, peerID) {
|
||||
atomic.AddInt32(&pool.numPending, -1)
|
||||
peer := pool.peers[peerID]
|
||||
if peer != nil {
|
||||
@@ -312,7 +313,7 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, blockResponse *BlockRespons
|
||||
} else {
|
||||
|
||||
err := errors.New("requester is different or block already exists")
|
||||
pool.logger.Error(err.Error(), "peer", peerID, "requester", requester.getPeerID(), "blockHeight", blockResponse.block.Height)
|
||||
pool.logger.Error(err.Error(), "peer", peerID, "requester", requester.getPeerID(), "blockHeight", block.Height)
|
||||
pool.sendError(err, peerID)
|
||||
|
||||
}
|
||||
@@ -582,7 +583,7 @@ type bpRequester struct {
|
||||
|
||||
mtx sync.Mutex
|
||||
peerID types.NodeID
|
||||
block *BlockResponse //*types.Block
|
||||
block *types.Block //*types.Block
|
||||
|
||||
}
|
||||
|
||||
@@ -609,7 +610,7 @@ func (bpr *bpRequester) OnStart(ctx context.Context) error {
|
||||
func (*bpRequester) OnStop() {}
|
||||
|
||||
// Returns true if the peer matches and block doesn't already exist.
|
||||
func (bpr *bpRequester) setBlock(block *BlockResponse, peerID types.NodeID) bool {
|
||||
func (bpr *bpRequester) setBlock(block *types.Block, peerID types.NodeID) bool {
|
||||
bpr.mtx.Lock()
|
||||
if bpr.block != nil || bpr.peerID != peerID {
|
||||
bpr.mtx.Unlock()
|
||||
@@ -625,7 +626,7 @@ func (bpr *bpRequester) setBlock(block *BlockResponse, peerID types.NodeID) bool
|
||||
return true
|
||||
}
|
||||
|
||||
func (bpr *bpRequester) getBlock() *BlockResponse {
|
||||
func (bpr *bpRequester) getBlock() *types.Block {
|
||||
bpr.mtx.Lock()
|
||||
defer bpr.mtx.Unlock()
|
||||
return bpr.block
|
||||
|
||||
@@ -43,7 +43,7 @@ func (p testPeer) runInputRoutine() {
|
||||
// Request desired, pretend like we got the block immediately.
|
||||
func (p testPeer) simulateInput(input inputData) {
|
||||
block := &types.Block{Header: types.Header{Height: input.request.Height}}
|
||||
input.pool.AddBlock(input.request.PeerID, &BlockResponse{block, &types.Commit{}}, 123)
|
||||
input.pool.AddBlock(input.request.PeerID, block, 123)
|
||||
// TODO: uncommenting this creates a race which is detected by:
|
||||
// https://github.com/golang/go/blob/2bd767b1022dd3254bcec469f0ee164024726486/src/testing/testing.go#L854-L856
|
||||
// see: https://github.com/tendermint/tendermint/issues/3390#issue-418379890
|
||||
|
||||
@@ -244,7 +244,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
|
||||
if blockCommit != nil {
|
||||
return blockSyncCh.Send(ctx, p2p.Envelope{
|
||||
To: peerID,
|
||||
Message: &bcproto.BlockResponse{Block: block, Commit: blockCommit},
|
||||
Message: &bcproto.BlockResponse{Block: block},
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -285,13 +285,8 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
|
||||
r.logger.Error("failed to convert block from proto", "err", err)
|
||||
return err
|
||||
}
|
||||
commit, err := types.CommitFromProto(msg.Commit)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to convert commit from proto", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
r.pool.AddBlock(envelope.From, &BlockResponse{block, commit}, block.Size())
|
||||
r.pool.AddBlock(envelope.From, block, block.Size())
|
||||
case *bcproto.StatusRequest:
|
||||
return blockSyncCh.Send(ctx, p2p.Envelope{
|
||||
To: envelope.From,
|
||||
@@ -533,50 +528,75 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
|
||||
//
|
||||
// TODO: Uncouple from request routine.
|
||||
|
||||
newBlock := r.pool.PeekBlock()
|
||||
newBlock, verifyBlock := r.pool.PeekTwoBlocks()
|
||||
|
||||
if newBlock == nil {
|
||||
if newBlock == nil || verifyBlock == nil {
|
||||
continue
|
||||
} else {
|
||||
didProcessCh <- struct{}{}
|
||||
}
|
||||
|
||||
if r.lastTrustedBlock == nil && r.initialState.LastBlockHeight != 0 {
|
||||
r.lastTrustedBlock = &BlockResponse{r.store.LoadBlock(r.initialState.LastBlockHeight), r.store.LoadBlockCommit(r.initialState.LastBlockHeight)}
|
||||
if r.lastTrustedBlock == nil {
|
||||
panic("Failed to load last trusted block")
|
||||
}
|
||||
}
|
||||
newBlockParts, err2 := newBlock.block.MakePartSet(types.BlockPartSizeBytes)
|
||||
newBlockParts, err2 := newBlock.MakePartSet(types.BlockPartSizeBytes)
|
||||
if err2 != nil {
|
||||
r.logger.Error("failed to make ",
|
||||
"height", newBlock.block.Height,
|
||||
"height", newBlock.Height,
|
||||
"err", err2.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
newBlockPartSetHeader = newBlockParts.Header()
|
||||
newBlockID = types.BlockID{Hash: newBlock.block.Hash(), PartSetHeader: newBlockPartSetHeader}
|
||||
newBlockID = types.BlockID{Hash: newBlock.Hash(), PartSetHeader: newBlockPartSetHeader}
|
||||
)
|
||||
|
||||
if r.lastTrustedBlock != nil && r.lastTrustedBlock.block != nil {
|
||||
if newBlock.block.Height != r.lastTrustedBlock.block.Height+1 {
|
||||
panic("Need last block")
|
||||
// ToDo @jmalicevic
|
||||
// newBlock.last commit - validates r.lastTrustedBlock
|
||||
// if fail - peer dismissed
|
||||
// r.lastTrustedBlock.header. nextValidator = newBlock. validator
|
||||
// validators. validate (newBlock)
|
||||
if r.lastTrustedBlock != nil {
|
||||
|
||||
// If the blockID in LastCommit of NewBlock does not match the trusted block
|
||||
// we can assume NewBlock is not correct
|
||||
if !(newBlock.LastCommit.BlockID.Equals(r.lastTrustedBlock.commit.BlockID)) {
|
||||
peerID := r.pool.RedoRequest(r.lastTrustedBlock.block.Height + 1)
|
||||
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
|
||||
NodeID: peerID,
|
||||
Err: errors.New("invalid block for verification"),
|
||||
}); serr != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// ToDo @jmalicevic
|
||||
// newBlock.last commit - validates r.lastTrustedBlock
|
||||
// if fail - peer dismissed
|
||||
// r.lastTrustedBlock.header. nextValidator = newBlock. validator
|
||||
// validators. validate (newBlock)
|
||||
// Todo: Verify verifyBlock.LastCommit validators against state.NextValidators
|
||||
// If they do not match, need a new verifyBlock
|
||||
|
||||
err := r.VerifyAdjacent(&types.SignedHeader{Header: &r.lastTrustedBlock.block.Header, Commit: r.lastTrustedBlock.commit}, &types.SignedHeader{Header: &newBlock.block.Header, Commit: newBlock.commit}, state.NextValidators)
|
||||
if err := state.NextValidators.VerifyCommitLight(state.ChainID, newBlockID, newBlock.Height, verifyBlock.LastCommit); err != nil {
|
||||
|
||||
err = fmt.Errorf("invalid verification block, validator hash does not match %w", err)
|
||||
r.logger.Error(
|
||||
err.Error(),
|
||||
"last_commit", verifyBlock.LastCommit,
|
||||
"block_id", newBlockID,
|
||||
"height", r.lastTrustedBlock.block.Height,
|
||||
)
|
||||
peerID := r.pool.RedoRequest(r.lastTrustedBlock.block.Height + 2)
|
||||
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
|
||||
NodeID: peerID,
|
||||
Err: err,
|
||||
}); serr != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
// Verify NewBlock usign the validator set obtained after applying the last block
|
||||
// Note: VerifyAdjacent in the LightClient relies on a trusting period which is not applicable here
|
||||
// ToDo: We need witness verification here as well
|
||||
err := r.VerifyAdjacent(&types.SignedHeader{Header: &r.lastTrustedBlock.block.Header, Commit: r.lastTrustedBlock.commit}, &types.SignedHeader{Header: &newBlock.Header, Commit: verifyBlock.LastCommit}, state.NextValidators)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("invalid last commit: %w", err)
|
||||
r.logger.Error(
|
||||
err.Error(),
|
||||
"last_commit", newBlock.block.LastCommit,
|
||||
"last_commit", verifyBlock.LastCommit,
|
||||
"block_id", newBlockID,
|
||||
"height", r.lastTrustedBlock.block.Height,
|
||||
)
|
||||
@@ -589,38 +609,45 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
if r.initialState.LastBlockHeight != 0 {
|
||||
r.lastTrustedBlock = &BlockResponse{r.store.LoadBlock(r.initialState.LastBlockHeight), r.store.LoadBlockCommit(r.initialState.LastBlockHeight)}
|
||||
if r.lastTrustedBlock == nil {
|
||||
panic("Failed to load last trusted block")
|
||||
}
|
||||
}
|
||||
// chainID := r.initialState.ChainID
|
||||
oldHash := r.initialState.Validators.Hash()
|
||||
if !bytes.Equal(oldHash, newBlock.block.ValidatorsHash) {
|
||||
if !bytes.Equal(oldHash, newBlock.ValidatorsHash) {
|
||||
|
||||
fmt.Println(
|
||||
|
||||
"initial hash ", r.initialState.Validators.Hash(),
|
||||
"new hash ", newBlock.block.ValidatorsHash,
|
||||
"new hash ", newBlock.ValidatorsHash,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
r.lastTrustedBlock = &BlockResponse{block: newBlock, commit: verifyBlock.LastCommit}
|
||||
}
|
||||
r.lastTrustedBlock = newBlock
|
||||
r.lastTrustedBlock.block = newBlock
|
||||
r.lastTrustedBlock.commit = verifyBlock.LastCommit
|
||||
r.pool.PopRequest()
|
||||
|
||||
// TODO: batch saves so we do not persist to disk every block
|
||||
r.store.SaveBlock(newBlock.block, newBlockParts, newBlock.commit)
|
||||
r.store.SaveBlock(newBlock, newBlockParts, verifyBlock.LastCommit)
|
||||
|
||||
var err error
|
||||
|
||||
// TODO: Same thing for app - but we would need a way to get the hash
|
||||
// without persisting the state.
|
||||
state, err = r.blockExec.ApplyBlock(ctx, state, newBlockID, newBlock.block)
|
||||
state, err = r.blockExec.ApplyBlock(ctx, state, newBlockID, newBlock)
|
||||
if err != nil {
|
||||
// TODO: This is bad, are we zombie?
|
||||
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", newBlock.block.Height, newBlock.block.Hash(), err))
|
||||
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", newBlock.Height, newBlock.Hash(), err))
|
||||
}
|
||||
|
||||
r.metrics.RecordConsMetrics(newBlock.block)
|
||||
r.metrics.RecordConsMetrics(newBlock)
|
||||
|
||||
blocksSynced++
|
||||
|
||||
|
||||
@@ -275,6 +275,7 @@ func TestReactor_SyncTime(t *testing.T) {
|
||||
require.Eventually(
|
||||
t,
|
||||
func() bool {
|
||||
//t.Logf("%d %d %s", rts.reactors[rts.nodes[1]].pool.height, rts.reactors[rts.nodes[0]].pool.height, rts.reactors[rts.nodes[1]].GetRemainingSyncTime())
|
||||
return rts.reactors[rts.nodes[1]].GetRemainingSyncTime() > time.Nanosecond &&
|
||||
rts.reactors[rts.nodes[1]].pool.getLastSyncRate() > 0.001
|
||||
},
|
||||
@@ -315,7 +316,10 @@ func TestReactor_NoBlockResponse(t *testing.T) {
|
||||
secondaryPool := rts.reactors[rts.nodes[1]].pool
|
||||
require.Eventually(
|
||||
t,
|
||||
func() bool { return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() },
|
||||
func() bool {
|
||||
t.Logf("%d %d", secondaryPool.MaxPeerHeight(), secondaryPool.height)
|
||||
return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp()
|
||||
},
|
||||
10*time.Second,
|
||||
10*time.Millisecond,
|
||||
"expected node to be fully synced",
|
||||
|
||||
Reference in New Issue
Block a user