From 60eaa831afceabc7d27989e0fba9270fc5f912f3 Mon Sep 17 00:00:00 2001 From: Jasmina Malicevic Date: Thu, 14 Apr 2022 13:37:26 +0200 Subject: [PATCH] blocksync: Verification using LastCommit of block at height H + 2 --- internal/blocksync/pool.go | 29 +++++---- internal/blocksync/pool_test.go | 2 +- internal/blocksync/reactor.go | 101 ++++++++++++++++++----------- internal/blocksync/reactor_test.go | 6 +- 4 files changed, 85 insertions(+), 53 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index b197a48ba..bfc6179a6 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -208,7 +208,7 @@ func (pool *BlockPool) IsCaughtUp() bool { return pool.height >= (pool.maxPeerHeight - 1) } -func (pool *BlockPool) PeekBlock() (first *BlockResponse) { +func (pool *BlockPool) PeekBlock() (first *types.Block) { pool.mtx.RLock() defer pool.mtx.RUnlock() @@ -227,10 +227,10 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) defer pool.mtx.RUnlock() if r := pool.requesters[pool.height]; r != nil { - first = r.getBlock().block + first = r.getBlock() } if r := pool.requesters[pool.height+1]; r != nil { - second = r.getBlock().block + second = r.getBlock() } return } @@ -245,10 +245,11 @@ func (pool *BlockPool) PopRequest() { delete(pool.requesters, pool.height) pool.height++ pool.lastAdvance = time.Now() - // the lastSyncRate will be updated every 100 blocks, it uses the adaptive filter // to smooth the block sync rate and the unit represents the number of blocks per second. - if (pool.height-pool.startHeight)%100 == 0 { + // -1 because the start height is assumed to be 1 @jmalicevic ToDo, verify it is still OK when + // starting height is not 1 + if (pool.height-pool.startHeight-1)%100 == 0 { newSyncRate := 100 / time.Since(pool.lastHundredBlockTimeStamp).Seconds() if pool.lastSyncRate == 0 { pool.lastSyncRate = newSyncRate @@ -285,15 +286,15 @@ func (pool *BlockPool) RedoRequest(height int64) types.NodeID { // AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it. // TODO: ensure that blocks come in order for each peer. -func (pool *BlockPool) AddBlock(peerID types.NodeID, blockResponse *BlockResponse, blockSize int) { +func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, blockSize int) { pool.mtx.Lock() defer pool.mtx.Unlock() - requester := pool.requesters[blockResponse.block.Height] + requester := pool.requesters[block.Height] if requester == nil { pool.logger.Error("peer sent us a block we didn't expect", - "peer", peerID, "curHeight", pool.height, "blockHeight", blockResponse.block.Height) - diff := pool.height - blockResponse.block.Height + "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height) + diff := pool.height - block.Height if diff < 0 { diff *= -1 } @@ -303,7 +304,7 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, blockResponse *BlockRespons return } - if requester.setBlock(blockResponse, peerID) { + if requester.setBlock(block, peerID) { atomic.AddInt32(&pool.numPending, -1) peer := pool.peers[peerID] if peer != nil { @@ -312,7 +313,7 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, blockResponse *BlockRespons } else { err := errors.New("requester is different or block already exists") - pool.logger.Error(err.Error(), "peer", peerID, "requester", requester.getPeerID(), "blockHeight", blockResponse.block.Height) + pool.logger.Error(err.Error(), "peer", peerID, "requester", requester.getPeerID(), "blockHeight", block.Height) pool.sendError(err, peerID) } @@ -582,7 +583,7 @@ type bpRequester struct { mtx sync.Mutex peerID types.NodeID - block *BlockResponse //*types.Block + block *types.Block //*types.Block } @@ -609,7 +610,7 @@ func (bpr *bpRequester) OnStart(ctx context.Context) error { func (*bpRequester) OnStop() {} // Returns true if the peer matches and block doesn't already exist. -func (bpr *bpRequester) setBlock(block *BlockResponse, peerID types.NodeID) bool { +func (bpr *bpRequester) setBlock(block *types.Block, peerID types.NodeID) bool { bpr.mtx.Lock() if bpr.block != nil || bpr.peerID != peerID { bpr.mtx.Unlock() @@ -625,7 +626,7 @@ func (bpr *bpRequester) setBlock(block *BlockResponse, peerID types.NodeID) bool return true } -func (bpr *bpRequester) getBlock() *BlockResponse { +func (bpr *bpRequester) getBlock() *types.Block { bpr.mtx.Lock() defer bpr.mtx.Unlock() return bpr.block diff --git a/internal/blocksync/pool_test.go b/internal/blocksync/pool_test.go index 2b66cfe4a..ae29b205e 100644 --- a/internal/blocksync/pool_test.go +++ b/internal/blocksync/pool_test.go @@ -43,7 +43,7 @@ func (p testPeer) runInputRoutine() { // Request desired, pretend like we got the block immediately. func (p testPeer) simulateInput(input inputData) { block := &types.Block{Header: types.Header{Height: input.request.Height}} - input.pool.AddBlock(input.request.PeerID, &BlockResponse{block, &types.Commit{}}, 123) + input.pool.AddBlock(input.request.PeerID, block, 123) // TODO: uncommenting this creates a race which is detected by: // https://github.com/golang/go/blob/2bd767b1022dd3254bcec469f0ee164024726486/src/testing/testing.go#L854-L856 // see: https://github.com/tendermint/tendermint/issues/3390#issue-418379890 diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 7b053bc1d..4f2be141c 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -244,7 +244,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, if blockCommit != nil { return blockSyncCh.Send(ctx, p2p.Envelope{ To: peerID, - Message: &bcproto.BlockResponse{Block: block, Commit: blockCommit}, + Message: &bcproto.BlockResponse{Block: block}, }) } } @@ -285,13 +285,8 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Error("failed to convert block from proto", "err", err) return err } - commit, err := types.CommitFromProto(msg.Commit) - if err != nil { - r.logger.Error("failed to convert commit from proto", "err", err) - return err - } - r.pool.AddBlock(envelope.From, &BlockResponse{block, commit}, block.Size()) + r.pool.AddBlock(envelope.From, block, block.Size()) case *bcproto.StatusRequest: return blockSyncCh.Send(ctx, p2p.Envelope{ To: envelope.From, @@ -533,50 +528,75 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh // // TODO: Uncouple from request routine. - newBlock := r.pool.PeekBlock() + newBlock, verifyBlock := r.pool.PeekTwoBlocks() - if newBlock == nil { + if newBlock == nil || verifyBlock == nil { continue } else { didProcessCh <- struct{}{} } - if r.lastTrustedBlock == nil && r.initialState.LastBlockHeight != 0 { - r.lastTrustedBlock = &BlockResponse{r.store.LoadBlock(r.initialState.LastBlockHeight), r.store.LoadBlockCommit(r.initialState.LastBlockHeight)} - if r.lastTrustedBlock == nil { - panic("Failed to load last trusted block") - } - } - newBlockParts, err2 := newBlock.block.MakePartSet(types.BlockPartSizeBytes) + newBlockParts, err2 := newBlock.MakePartSet(types.BlockPartSizeBytes) if err2 != nil { r.logger.Error("failed to make ", - "height", newBlock.block.Height, + "height", newBlock.Height, "err", err2.Error()) return } var ( newBlockPartSetHeader = newBlockParts.Header() - newBlockID = types.BlockID{Hash: newBlock.block.Hash(), PartSetHeader: newBlockPartSetHeader} + newBlockID = types.BlockID{Hash: newBlock.Hash(), PartSetHeader: newBlockPartSetHeader} ) - if r.lastTrustedBlock != nil && r.lastTrustedBlock.block != nil { - if newBlock.block.Height != r.lastTrustedBlock.block.Height+1 { - panic("Need last block") + // ToDo @jmalicevic + // newBlock.last commit - validates r.lastTrustedBlock + // if fail - peer dismissed + // r.lastTrustedBlock.header. nextValidator = newBlock. validator + // validators. validate (newBlock) + if r.lastTrustedBlock != nil { + + // If the blockID in LastCommit of NewBlock does not match the trusted block + // we can assume NewBlock is not correct + if !(newBlock.LastCommit.BlockID.Equals(r.lastTrustedBlock.commit.BlockID)) { + peerID := r.pool.RedoRequest(r.lastTrustedBlock.block.Height + 1) + if serr := blockSyncCh.SendError(ctx, p2p.PeerError{ + NodeID: peerID, + Err: errors.New("invalid block for verification"), + }); serr != nil { + return + } } - // ToDo @jmalicevic - // newBlock.last commit - validates r.lastTrustedBlock - // if fail - peer dismissed - // r.lastTrustedBlock.header. nextValidator = newBlock. validator - // validators. validate (newBlock) + // Todo: Verify verifyBlock.LastCommit validators against state.NextValidators + // If they do not match, need a new verifyBlock - err := r.VerifyAdjacent(&types.SignedHeader{Header: &r.lastTrustedBlock.block.Header, Commit: r.lastTrustedBlock.commit}, &types.SignedHeader{Header: &newBlock.block.Header, Commit: newBlock.commit}, state.NextValidators) + if err := state.NextValidators.VerifyCommitLight(state.ChainID, newBlockID, newBlock.Height, verifyBlock.LastCommit); err != nil { + + err = fmt.Errorf("invalid verification block, validator hash does not match %w", err) + r.logger.Error( + err.Error(), + "last_commit", verifyBlock.LastCommit, + "block_id", newBlockID, + "height", r.lastTrustedBlock.block.Height, + ) + peerID := r.pool.RedoRequest(r.lastTrustedBlock.block.Height + 2) + if serr := blockSyncCh.SendError(ctx, p2p.PeerError{ + NodeID: peerID, + Err: err, + }); serr != nil { + return + } + } + // Verify NewBlock usign the validator set obtained after applying the last block + // Note: VerifyAdjacent in the LightClient relies on a trusting period which is not applicable here + // ToDo: We need witness verification here as well + err := r.VerifyAdjacent(&types.SignedHeader{Header: &r.lastTrustedBlock.block.Header, Commit: r.lastTrustedBlock.commit}, &types.SignedHeader{Header: &newBlock.Header, Commit: verifyBlock.LastCommit}, state.NextValidators) if err != nil { err = fmt.Errorf("invalid last commit: %w", err) r.logger.Error( err.Error(), - "last_commit", newBlock.block.LastCommit, + "last_commit", verifyBlock.LastCommit, "block_id", newBlockID, "height", r.lastTrustedBlock.block.Height, ) @@ -589,38 +609,45 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh return } } - } else { + + if r.initialState.LastBlockHeight != 0 { + r.lastTrustedBlock = &BlockResponse{r.store.LoadBlock(r.initialState.LastBlockHeight), r.store.LoadBlockCommit(r.initialState.LastBlockHeight)} + if r.lastTrustedBlock == nil { + panic("Failed to load last trusted block") + } + } // chainID := r.initialState.ChainID oldHash := r.initialState.Validators.Hash() - if !bytes.Equal(oldHash, newBlock.block.ValidatorsHash) { + if !bytes.Equal(oldHash, newBlock.ValidatorsHash) { fmt.Println( "initial hash ", r.initialState.Validators.Hash(), - "new hash ", newBlock.block.ValidatorsHash, + "new hash ", newBlock.ValidatorsHash, ) return } - + r.lastTrustedBlock = &BlockResponse{block: newBlock, commit: verifyBlock.LastCommit} } - r.lastTrustedBlock = newBlock + r.lastTrustedBlock.block = newBlock + r.lastTrustedBlock.commit = verifyBlock.LastCommit r.pool.PopRequest() // TODO: batch saves so we do not persist to disk every block - r.store.SaveBlock(newBlock.block, newBlockParts, newBlock.commit) + r.store.SaveBlock(newBlock, newBlockParts, verifyBlock.LastCommit) var err error // TODO: Same thing for app - but we would need a way to get the hash // without persisting the state. - state, err = r.blockExec.ApplyBlock(ctx, state, newBlockID, newBlock.block) + state, err = r.blockExec.ApplyBlock(ctx, state, newBlockID, newBlock) if err != nil { // TODO: This is bad, are we zombie? - panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", newBlock.block.Height, newBlock.block.Hash(), err)) + panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", newBlock.Height, newBlock.Hash(), err)) } - r.metrics.RecordConsMetrics(newBlock.block) + r.metrics.RecordConsMetrics(newBlock) blocksSynced++ diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 065d75301..99c2ba8dc 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -275,6 +275,7 @@ func TestReactor_SyncTime(t *testing.T) { require.Eventually( t, func() bool { + //t.Logf("%d %d %s", rts.reactors[rts.nodes[1]].pool.height, rts.reactors[rts.nodes[0]].pool.height, rts.reactors[rts.nodes[1]].GetRemainingSyncTime()) return rts.reactors[rts.nodes[1]].GetRemainingSyncTime() > time.Nanosecond && rts.reactors[rts.nodes[1]].pool.getLastSyncRate() > 0.001 }, @@ -315,7 +316,10 @@ func TestReactor_NoBlockResponse(t *testing.T) { secondaryPool := rts.reactors[rts.nodes[1]].pool require.Eventually( t, - func() bool { return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() }, + func() bool { + t.Logf("%d %d", secondaryPool.MaxPeerHeight(), secondaryPool.height) + return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() + }, 10*time.Second, 10*time.Millisecond, "expected node to be fully synced",