separate extended commit save

This commit is contained in:
William Banfield
2022-05-17 20:37:59 -04:00
parent c3686fea00
commit 588a310049
10 changed files with 147 additions and 16 deletions

View File

@@ -601,7 +601,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
r.pool.PopRequest()
// TODO: batch saves so we do not persist to disk every block
r.store.SaveBlock(first, firstParts, extCommit)
if state.ConsensusParams.ABCI.VoteExtensionsEnabled(state.LastBlockHeight) {
r.store.SaveBlockWithExtendedCommit(first, firstParts, extCommit)
} else {
r.store.SaveBlock(first, firstParts, extCommit.ToCommit())
}
// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.

View File

@@ -183,7 +183,7 @@ func (rts *reactorTestSuite) addNode(
state, err = blockExec.ApplyBlock(ctx, state, blockID, thisBlock)
require.NoError(t, err)
blockStore.SaveBlock(thisBlock, thisParts, seenExtCommit)
blockStore.SaveBlockWithExtendedCommit(thisBlock, thisParts, seenExtCommit)
}
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)

View File

@@ -686,7 +686,7 @@ func TestSwitchToConsensusVoteExtensions(t *testing.T) {
added, err := voteSet.AddVote(signedVote)
require.NoError(t, err)
require.True(t, added)
cs.blockStore.SaveBlock(propBlock, blockParts, voteSet.MakeExtendedCommit())
cs.blockStore.SaveBlockWithExtendedCommit(propBlock, blockParts, voteSet.MakeExtendedCommit())
reactor := NewReactor(
log.NewNopLogger(),
cs,

View File

@@ -1204,7 +1204,9 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
}
}
func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) {
func (bs *mockBlockStore) SaveBlockWithExtendedCommit(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) {
}
func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
}
func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {

View File

@@ -1956,8 +1956,12 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) {
if cs.blockStore.Height() < block.Height {
// NOTE: the seenCommit is local justification to commit this block,
// but may differ from the LastCommit included in the next block
precommits := cs.Votes.Precommits(cs.CommitRound)
cs.blockStore.SaveBlock(block, blockParts, precommits.MakeExtendedCommit())
seenExtendedCommit := cs.Votes.Precommits(cs.CommitRound).MakeExtendedCommit()
if cs.state.ConsensusParams.ABCI.VoteExtensionsEnabled(block.Height) {
cs.blockStore.SaveBlockWithExtendedCommit(block, blockParts, seenExtendedCommit)
} else {
cs.blockStore.SaveBlock(block, blockParts, seenExtendedCommit.ToCommit())
}
} else {
// Happens during replay if we already saved the block but didn't commit
logger.Debug("calling finalizeCommit on already stored block", "height", block.Height)

View File

@@ -580,7 +580,7 @@ func initializeBlockStore(db dbm.DB, state sm.State, valAddr []byte) (*store.Blo
}
seenCommit := makeExtCommit(i, valAddr)
blockStore.SaveBlock(block, partSet, seenCommit)
blockStore.SaveBlockWithExtendedCommit(block, partSet, seenCommit)
}
return blockStore, nil

View File

@@ -209,7 +209,12 @@ func (_m *BlockStore) PruneBlocks(height int64) (uint64, error) {
}
// SaveBlock provides a mock function with given fields: block, blockParts, seenCommit
func (_m *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) {
func (_m *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
_m.Called(block, blockParts, seenCommit)
}
// SaveBlockWithExtendedCommit provides a mock function with given fields: block, blockParts, seenCommit
func (_m *BlockStore) SaveBlockWithExtendedCommit(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) {
_m.Called(block, blockParts, seenCommit)
}

View File

@@ -26,7 +26,8 @@ type BlockStore interface {
LoadBlockMeta(height int64) *types.BlockMeta
LoadBlock(height int64) *types.Block
SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit)
SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
SaveBlockWithExtendedCommit(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit)
PruneBlocks(height int64) (uint64, error)

View File

@@ -2,11 +2,13 @@ package store
import (
"bytes"
"errors"
"fmt"
"strconv"
"github.com/gogo/protobuf/proto"
"github.com/google/orderedcode"
db "github.com/tendermint/tm-db"
dbm "github.com/tendermint/tm-db"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
@@ -466,7 +468,7 @@ func (bs *BlockStore) batchDelete(
// If all the nodes restart after committing a block,
// we need this to reload the precommits to catch-up nodes to the
// most recent height. Otherwise they'd stall at H-1.
func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) {
func (bs *BlockStore) SaveBlockWithExtensions(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) {
if block == nil {
panic("BlockStore can only save a non-nil block")
}
@@ -539,6 +541,119 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
}
}
// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db.
// blockParts: Must be parts of the block
// seenCommit: The +2/3 precommits that were seen which committed at height.
// If all the nodes restart after committing a block,
// we need this to reload the precommits to catch-up nodes to the
// most recent height. Otherwise they'd stall at H-1.
func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
if block == nil {
panic("BlockStore can only save a non-nil block")
}
batch := bs.db.NewBatch()
if err := bs.saveBlockToBatch(batch, block, blockParts, seenCommit); err != nil {
panic(err)
}
if err := batch.WriteSync(); err != nil {
panic(err)
}
if err := batch.Close(); err != nil {
panic(err)
}
}
//SaveBlockWithExtendedCommit saves the block along with the extended commit data.
func (bs *BlockStore) SaveBlockWithExtendedCommit(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) {
if block == nil {
panic("BlockStore can only save a non-nil block")
}
batch := bs.db.NewBatch()
if err := bs.saveBlockToBatch(batch, block, blockParts, seenCommit.ToCommit()); err != nil {
panic(err)
}
height := block.Height
pbec := seenCommit.ToProto()
extCommitBytes := mustEncode(pbec)
if err := batch.Set(extCommitKey(height), extCommitBytes); err != nil {
panic(err)
}
if err := batch.WriteSync(); err != nil {
panic(err)
}
if err := batch.Close(); err != nil {
panic(err)
}
}
func (bs *BlockStore) saveBlockToBatch(batch db.Batch, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error {
if block == nil {
panic("BlockStore can only save a non-nil block")
}
height := block.Height
hash := block.Hash()
if g, w := height, bs.Height()+1; bs.Base() > 0 && g != w {
return fmt.Errorf("BlockStore can only save contiguous blocks. Wanted %v, got %v", w, g)
}
if !blockParts.IsComplete() {
return errors.New("BlockStore can only save complete block part sets")
}
if height != seenCommit.Height {
return fmt.Errorf("BlockStore cannot save seen commit of a different height (block: %d, commit: %d)", height, seenCommit.Height)
}
// Save block parts. This must be done before the block meta, since callers
// typically load the block meta first as an indication that the block exists
// and then go on to load block parts - we must make sure the block is
// complete as soon as the block meta is written.
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
bs.saveBlockPart(height, i, part, batch)
}
blockMeta := types.NewBlockMeta(block, blockParts)
pbm := blockMeta.ToProto()
if pbm == nil {
return errors.New("nil blockmeta")
}
metaBytes := mustEncode(pbm)
if err := batch.Set(blockMetaKey(height), metaBytes); err != nil {
return err
}
if err := batch.Set(blockHashKey(hash), []byte(fmt.Sprintf("%d", height))); err != nil {
return err
}
pbc := block.LastCommit.ToProto()
blockCommitBytes := mustEncode(pbc)
if err := batch.Set(blockCommitKey(height-1), blockCommitBytes); err != nil {
return err
}
// Save seen commit (seen +2/3 precommits for block)
pbsc := seenCommit.ToProto()
seenCommitBytes := mustEncode(pbsc)
if err := batch.Set(seenCommitKey(), seenCommitBytes); err != nil {
return err
}
pbec := seenCommit.ToProto()
extCommitBytes := mustEncode(pbec)
if err := batch.Set(extCommitKey(height), extCommitBytes); err != nil {
return err
}
return nil
}
func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part, batch dbm.Batch) {
pbp, err := part.ToProto()
if err != nil {

View File

@@ -89,7 +89,7 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) {
part2 := validPartSet.GetPart(1)
seenCommit := makeTestExtCommit(block.Header.Height, tmtime.Now())
bs.SaveBlock(block, validPartSet, seenCommit)
bs.SaveBlockWithExtendedCommit(block, validPartSet, seenCommit)
require.EqualValues(t, 1, bs.Base(), "expecting the new height to be changed")
require.EqualValues(t, block.Header.Height, bs.Height(), "expecting the new height to be changed")
@@ -209,7 +209,7 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) {
bs, db := newInMemoryBlockStore()
// SaveBlock
res, err, panicErr := doFn(func() (interface{}, error) {
bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit)
bs.SaveBlockWithExtendedCommit(tuple.block, tuple.parts, tuple.seenCommit)
if tuple.block == nil {
return nil, nil
}
@@ -293,7 +293,7 @@ func TestLoadBaseMeta(t *testing.T) {
partSet, err := block.MakePartSet(2)
require.NoError(t, err)
seenCommit := makeTestExtCommit(h, tmtime.Now())
bs.SaveBlock(block, partSet, seenCommit)
bs.SaveBlockWithExtendedCommit(block, partSet, seenCommit)
}
pruned, err := bs.PruneBlocks(4)
@@ -371,7 +371,7 @@ func TestPruneBlocks(t *testing.T) {
partSet, err := block.MakePartSet(2)
require.NoError(t, err)
seenCommit := makeTestExtCommit(h, tmtime.Now())
bs.SaveBlock(block, partSet, seenCommit)
bs.SaveBlockWithExtendedCommit(block, partSet, seenCommit)
}
assert.EqualValues(t, 1, bs.Base())
@@ -479,7 +479,7 @@ func TestBlockFetchAtHeight(t *testing.T) {
partSet, err := block.MakePartSet(2)
require.NoError(t, err)
seenCommit := makeTestExtCommit(block.Header.Height, tmtime.Now())
bs.SaveBlock(block, partSet, seenCommit)
bs.SaveBlockWithExtendedCommit(block, partSet, seenCommit)
require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed")
blockAtHeight := bs.LoadBlock(bs.Height())
@@ -523,7 +523,7 @@ func TestSeenAndCanonicalCommit(t *testing.T) {
partSet, err := block.MakePartSet(2)
require.NoError(t, err)
seenCommit := makeTestExtCommit(h, tmtime.Now())
store.SaveBlock(block, partSet, seenCommit)
store.SaveBlockWithExtendedCommit(block, partSet, seenCommit)
c3 := store.LoadSeenCommit()
require.NotNil(t, c3)
require.Equal(t, h, c3.Height)