mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-06 20:11:08 +00:00
add logic to propagate extended commits (#8433)
This commit is contained in:
@@ -185,16 +185,20 @@ func (pool *BlockPool) IsCaughtUp() bool {
|
||||
return isCaughtUp
|
||||
}
|
||||
|
||||
// PeekTwoBlocks returns blocks at pool.height and pool.height+1.
|
||||
// We need to see the second block's Commit to validate the first block.
|
||||
// So we peek two blocks at a time.
|
||||
// PeekTwoBlocks returns blocks at pool.height and pool.height+1. We need to
|
||||
// see the second block's Commit to validate the first block. So we peek two
|
||||
// blocks at a time. We return an extended commit, containing vote extensions
|
||||
// and their associated signatures, as this is critical to consensus in ABCI++
|
||||
// as we switch from block sync to consensus mode.
|
||||
//
|
||||
// The caller will verify the commit.
|
||||
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
|
||||
func (pool *BlockPool) PeekTwoBlocks() (first, second *types.Block, firstExtCommit *types.ExtendedCommit) {
|
||||
pool.mtx.Lock()
|
||||
defer pool.mtx.Unlock()
|
||||
|
||||
if r := pool.requesters[pool.height]; r != nil {
|
||||
first = r.getBlock()
|
||||
firstExtCommit = r.getExtendedCommit()
|
||||
}
|
||||
if r := pool.requesters[pool.height+1]; r != nil {
|
||||
second = r.getBlock()
|
||||
@@ -203,7 +207,8 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
|
||||
}
|
||||
|
||||
// PopRequest pops the first block at pool.height.
|
||||
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
|
||||
// It must have been validated by the second Commit from PeekTwoBlocks.
|
||||
// TODO(thane): (?) and its corresponding ExtendedCommit.
|
||||
func (pool *BlockPool) PopRequest() {
|
||||
pool.mtx.Lock()
|
||||
defer pool.mtx.Unlock()
|
||||
@@ -240,12 +245,22 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
|
||||
return peerID
|
||||
}
|
||||
|
||||
// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
|
||||
// TODO: ensure that blocks come in order for each peer.
|
||||
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) {
|
||||
// AddBlock validates that the block comes from the peer it was expected from
|
||||
// and calls the requester to store it.
|
||||
//
|
||||
// This requires an extended commit at the same height as the supplied block -
|
||||
// the block contains the last commit, but we need the latest commit in case we
|
||||
// need to switch over from block sync to consensus at this height. If the
|
||||
// height of the extended commit and the height of the block do not match, we
|
||||
// do not add the block and return an error.
|
||||
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, extCommit *types.ExtendedCommit, blockSize int) error {
|
||||
pool.mtx.Lock()
|
||||
defer pool.mtx.Unlock()
|
||||
|
||||
if block.Height != extCommit.Height {
|
||||
return fmt.Errorf("heights don't match, not adding block (block height: %d, commit height: %d)", block.Height, extCommit.Height)
|
||||
}
|
||||
|
||||
requester := pool.requesters[block.Height]
|
||||
if requester == nil {
|
||||
pool.Logger.Info(
|
||||
@@ -263,19 +278,22 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
|
||||
if diff > maxDiffBetweenCurrentAndReceivedBlockHeight {
|
||||
pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
|
||||
}
|
||||
return
|
||||
return fmt.Errorf("peer sent us a block we didn't expect (peer: %s, current height: %d, block height: %d)", peerID, pool.height, block.Height)
|
||||
}
|
||||
|
||||
if requester.setBlock(block, peerID) {
|
||||
if requester.setBlock(block, extCommit, peerID) {
|
||||
atomic.AddInt32(&pool.numPending, -1)
|
||||
peer := pool.peers[peerID]
|
||||
if peer != nil {
|
||||
peer.decrPending(blockSize)
|
||||
}
|
||||
} else {
|
||||
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height)
|
||||
pool.sendError(errors.New("invalid peer"), peerID)
|
||||
err := errors.New("requester is different or block already exists")
|
||||
pool.sendError(err, peerID)
|
||||
return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MaxPeerHeight returns the highest reported height.
|
||||
@@ -424,6 +442,7 @@ func (pool *BlockPool) debug() string {
|
||||
} else {
|
||||
str += fmt.Sprintf("H(%v):", h)
|
||||
str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil)
|
||||
str += fmt.Sprintf("C?(%v) ", pool.requesters[h].extCommit != nil)
|
||||
}
|
||||
}
|
||||
return str
|
||||
@@ -512,9 +531,10 @@ type bpRequester struct {
|
||||
gotBlockCh chan struct{}
|
||||
redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat
|
||||
|
||||
mtx tmsync.Mutex
|
||||
peerID p2p.ID
|
||||
block *types.Block
|
||||
mtx tmsync.Mutex
|
||||
peerID p2p.ID
|
||||
block *types.Block
|
||||
extCommit *types.ExtendedCommit
|
||||
}
|
||||
|
||||
func newBPRequester(pool *BlockPool, height int64) *bpRequester {
|
||||
@@ -537,13 +557,14 @@ func (bpr *bpRequester) OnStart() error {
|
||||
}
|
||||
|
||||
// Returns true if the peer matches and block doesn't already exist.
|
||||
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
|
||||
func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID p2p.ID) bool {
|
||||
bpr.mtx.Lock()
|
||||
if bpr.block != nil || bpr.peerID != peerID {
|
||||
bpr.mtx.Unlock()
|
||||
return false
|
||||
}
|
||||
bpr.block = block
|
||||
bpr.extCommit = extCommit
|
||||
bpr.mtx.Unlock()
|
||||
|
||||
select {
|
||||
@@ -559,6 +580,12 @@ func (bpr *bpRequester) getBlock() *types.Block {
|
||||
return bpr.block
|
||||
}
|
||||
|
||||
func (bpr *bpRequester) getExtendedCommit() *types.ExtendedCommit {
|
||||
bpr.mtx.Lock()
|
||||
defer bpr.mtx.Unlock()
|
||||
return bpr.extCommit
|
||||
}
|
||||
|
||||
func (bpr *bpRequester) getPeerID() p2p.ID {
|
||||
bpr.mtx.Lock()
|
||||
defer bpr.mtx.Unlock()
|
||||
@@ -576,6 +603,7 @@ func (bpr *bpRequester) reset() {
|
||||
|
||||
bpr.peerID = ""
|
||||
bpr.block = nil
|
||||
bpr.extCommit = nil
|
||||
}
|
||||
|
||||
// Tells bpRequester to pick another peer and try again.
|
||||
|
||||
@@ -42,7 +42,10 @@ func (p testPeer) runInputRoutine() {
|
||||
// Request desired, pretend like we got the block immediately.
|
||||
func (p testPeer) simulateInput(input inputData) {
|
||||
block := &types.Block{Header: types.Header{Height: input.request.Height}}
|
||||
input.pool.AddBlock(input.request.PeerID, block, 123)
|
||||
extCommit := &types.ExtendedCommit{
|
||||
Height: input.request.Height,
|
||||
}
|
||||
_ = input.pool.AddBlock(input.request.PeerID, block, extCommit, 123)
|
||||
// TODO: uncommenting this creates a race which is detected by:
|
||||
// https://github.com/golang/go/blob/2bd767b1022dd3254bcec469f0ee164024726486/src/testing/testing.go#L854-L856
|
||||
// see: https://github.com/tendermint/tendermint/issues/3390#issue-418379890
|
||||
@@ -112,7 +115,7 @@ func TestBlockPoolBasic(t *testing.T) {
|
||||
if !pool.IsRunning() {
|
||||
return
|
||||
}
|
||||
first, second := pool.PeekTwoBlocks()
|
||||
first, second, _ := pool.PeekTwoBlocks()
|
||||
if first != nil && second != nil {
|
||||
pool.PopRequest()
|
||||
} else {
|
||||
@@ -171,7 +174,7 @@ func TestBlockPoolTimeout(t *testing.T) {
|
||||
if !pool.IsRunning() {
|
||||
return
|
||||
}
|
||||
first, second := pool.PeekTwoBlocks()
|
||||
first, second, _ := pool.PeekTwoBlocks()
|
||||
if first != nil && second != nil {
|
||||
pool.PopRequest()
|
||||
} else {
|
||||
|
||||
@@ -52,7 +52,7 @@ type Reactor struct {
|
||||
initialState sm.State
|
||||
|
||||
blockExec *sm.BlockExecutor
|
||||
store *store.BlockStore
|
||||
store sm.BlockStore
|
||||
pool *BlockPool
|
||||
blockSync bool
|
||||
|
||||
@@ -172,34 +172,44 @@ func (bcR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
// respondToPeer loads a block and sends it to the requesting peer,
|
||||
// if we have it. Otherwise, we'll respond saying we don't have it.
|
||||
func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
|
||||
src p2p.Peer) (queued bool) {
|
||||
src p2p.Peer) error {
|
||||
|
||||
block := bcR.store.LoadBlock(msg.Height)
|
||||
if block != nil {
|
||||
extCommit := bcR.store.LoadBlockExtendedCommit(msg.Height)
|
||||
if extCommit == nil {
|
||||
return fmt.Errorf("found block in store without extended commit: %v", block)
|
||||
}
|
||||
bl, err := block.ToProto()
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return false
|
||||
return fmt.Errorf("failed to convert block to protobuf: %w", err)
|
||||
}
|
||||
|
||||
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl})
|
||||
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{
|
||||
Block: bl,
|
||||
ExtCommit: extCommit.ToProto(),
|
||||
})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not marshal msg", "err", err)
|
||||
return false
|
||||
return fmt.Errorf("could not marshal msg: %w", err)
|
||||
}
|
||||
|
||||
return src.TrySend(BlocksyncChannel, msgBytes)
|
||||
if !src.TrySend(BlocksyncChannel, msgBytes) {
|
||||
return fmt.Errorf("unable to queue blocksync message at height %d to peer %v", msg.Height, src)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
|
||||
|
||||
msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return false
|
||||
return fmt.Errorf("could not convert msg to protobuf: %w", err)
|
||||
}
|
||||
|
||||
return src.TrySend(BlocksyncChannel, msgBytes)
|
||||
if !src.TrySend(BlocksyncChannel, msgBytes) {
|
||||
return fmt.Errorf("unable to queue blocksync message at height %d to peer %v", msg.Height, src)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Receive implements Reactor by handling 4 types of messages (look below).
|
||||
@@ -223,12 +233,22 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
case *bcproto.BlockRequest:
|
||||
bcR.respondToPeer(msg, src)
|
||||
case *bcproto.BlockResponse:
|
||||
bi, err := types.BlockFromProto(msg.Block)
|
||||
block, err := types.BlockFromProto(msg.Block)
|
||||
if err != nil {
|
||||
bcR.Logger.Error("Block content is invalid", "err", err)
|
||||
return
|
||||
}
|
||||
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
|
||||
extCommit, err := types.ExtendedCommitFromProto(msg.ExtCommit)
|
||||
if err != nil {
|
||||
bcR.Logger.Error("failed to convert extended commit from proto",
|
||||
"peer", src,
|
||||
"err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := bcR.pool.AddBlock(src.ID(), block, extCommit, block.Size()); err != nil {
|
||||
bcR.Logger.Error("failed to add block", "err", err)
|
||||
}
|
||||
case *bcproto.StatusRequest:
|
||||
// Send peer our state.
|
||||
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
|
||||
@@ -236,7 +256,7 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
Base: bcR.store.Base(),
|
||||
})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobut", "err", err)
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return
|
||||
}
|
||||
src.TrySend(BlocksyncChannel, msgBytes)
|
||||
@@ -317,7 +337,20 @@ FOR_LOOP:
|
||||
outbound, inbound, _ := bcR.Switch.NumPeers()
|
||||
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
|
||||
"outbound", outbound, "inbound", inbound)
|
||||
if bcR.pool.IsCaughtUp() {
|
||||
switch {
|
||||
// TODO(sergio) Might be needed for implementing the upgrading solution. Remove after that
|
||||
//case state.LastBlockHeight > 0 && r.store.LoadBlockExtCommit(state.LastBlockHeight) == nil:
|
||||
case state.LastBlockHeight > 0 && blocksSynced == 0:
|
||||
// Having state-synced, we need to blocksync at least one block
|
||||
bcR.Logger.Info(
|
||||
"no seen commit yet",
|
||||
"height", height,
|
||||
"last_block_height", state.LastBlockHeight,
|
||||
"initial_height", state.InitialHeight,
|
||||
"max_peer_height", bcR.pool.MaxPeerHeight(),
|
||||
)
|
||||
continue FOR_LOOP
|
||||
case bcR.pool.IsCaughtUp():
|
||||
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
|
||||
if err := bcR.pool.Stop(); err != nil {
|
||||
bcR.Logger.Error("Error stopping pool", "err", err)
|
||||
@@ -349,10 +382,13 @@ FOR_LOOP:
|
||||
// routine.
|
||||
|
||||
// See if there are any blocks to sync.
|
||||
first, second := bcR.pool.PeekTwoBlocks()
|
||||
// bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
|
||||
if first == nil || second == nil {
|
||||
// We need both to sync the first block.
|
||||
first, second, extCommit := bcR.pool.PeekTwoBlocks()
|
||||
if first == nil || second == nil || extCommit == nil {
|
||||
if first != nil && extCommit == nil {
|
||||
// See https://github.com/tendermint/tendermint/pull/8433#discussion_r866790631
|
||||
panic(fmt.Errorf("peeked first block without extended commit at height %d - possible node store corruption", first.Height))
|
||||
}
|
||||
// we need all to sync the first block
|
||||
continue FOR_LOOP
|
||||
} else {
|
||||
// Try again quickly next loop.
|
||||
@@ -372,6 +408,7 @@ FOR_LOOP:
|
||||
// NOTE: we can probably make this more efficient, but note that calling
|
||||
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
|
||||
// currently necessary.
|
||||
// TODO(sergio): Should we also validate against the extended commit?
|
||||
err = state.Validators.VerifyCommitLight(
|
||||
chainID, firstID, first.Height, second.LastCommit)
|
||||
|
||||
@@ -402,7 +439,7 @@ FOR_LOOP:
|
||||
bcR.pool.PopRequest()
|
||||
|
||||
// TODO: batch saves so we dont persist to disk every block
|
||||
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
|
||||
bcR.store.SaveBlockWithExtendedCommit(first, firstParts, extCommit)
|
||||
|
||||
// TODO: same thing for app - but we would need a way to
|
||||
// get the hash without persisting the state
|
||||
|
||||
@@ -104,40 +104,44 @@ func newReactor(
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// The commit we are building for the current height.
|
||||
seenExtCommit := &types.ExtendedCommit{}
|
||||
|
||||
// let's add some blocks in
|
||||
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
|
||||
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
|
||||
if blockHeight > 1 {
|
||||
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
|
||||
lastBlock := blockStore.LoadBlock(blockHeight - 1)
|
||||
|
||||
vote, err := types.MakeVote(
|
||||
lastBlock.Header.Height,
|
||||
lastBlockMeta.BlockID,
|
||||
state.Validators,
|
||||
privVals[0],
|
||||
lastBlock.Header.ChainID,
|
||||
time.Now(),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
lastCommit = types.NewCommit(vote.Height, vote.Round,
|
||||
lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()})
|
||||
}
|
||||
|
||||
lastCommit := seenExtCommit.Clone().ToCommit()
|
||||
thisBlock := state.MakeBlock(blockHeight, nil, lastCommit, nil, state.Validators.Proposer.Address)
|
||||
|
||||
thisParts, err := thisBlock.MakePartSet(types.BlockPartSizeBytes)
|
||||
require.NoError(t, err)
|
||||
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
|
||||
|
||||
vote, err := types.MakeVote(
|
||||
thisBlock.Header.Height,
|
||||
blockID,
|
||||
state.Validators,
|
||||
privVals[0],
|
||||
thisBlock.Header.ChainID,
|
||||
time.Now(),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
seenExtCommit = &types.ExtendedCommit{
|
||||
Height: vote.Height,
|
||||
Round: vote.Round,
|
||||
BlockID: blockID,
|
||||
ExtendedSignatures: []types.ExtendedCommitSig{vote.ExtendedCommitSig()},
|
||||
}
|
||||
|
||||
blockStore.SaveBlockWithExtendedCommit(thisBlock, thisParts, seenExtCommit)
|
||||
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("error apply block: %w", err))
|
||||
}
|
||||
|
||||
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
|
||||
if err = stateStore.Save(state); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
|
||||
Reference in New Issue
Block a user