diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 9e81659be..8f8157829 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -601,7 +601,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh r.pool.PopRequest() // TODO: batch saves so we do not persist to disk every block - r.store.SaveBlock(first, firstParts, extCommit) + if state.ConsensusParams.ABCI.VoteExtensionsEnabled(state.LastBlockHeight) { + r.store.SaveBlockWithExtendedCommit(first, firstParts, extCommit) + } else { + r.store.SaveBlock(first, firstParts, extCommit.ToCommit()) + } // TODO: Same thing for app - but we would need a way to get the hash // without persisting the state. diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index fa5dc6b3b..d7a01c9de 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -183,7 +183,7 @@ func (rts *reactorTestSuite) addNode( state, err = blockExec.ApplyBlock(ctx, state, blockID, thisBlock) require.NoError(t, err) - blockStore.SaveBlock(thisBlock, thisParts, seenExtCommit) + blockStore.SaveBlockWithExtendedCommit(thisBlock, thisParts, seenExtCommit) } rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index f77173bbb..c6feb7eff 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -686,7 +686,7 @@ func TestSwitchToConsensusVoteExtensions(t *testing.T) { added, err := voteSet.AddVote(signedVote) require.NoError(t, err) require.True(t, added) - cs.blockStore.SaveBlock(propBlock, blockParts, voteSet.MakeExtendedCommit()) + cs.blockStore.SaveBlockWithExtendedCommit(propBlock, blockParts, voteSet.MakeExtendedCommit()) reactor := NewReactor( log.NewNopLogger(), cs, diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index be770327a..99d3c17a1 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -1204,7 +1204,9 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { } } func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } -func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) { +func (bs *mockBlockStore) SaveBlockWithExtendedCommit(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) { +} +func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { } func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit { diff --git a/internal/consensus/state.go b/internal/consensus/state.go index bd166d410..b49226284 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1956,8 +1956,12 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) { if cs.blockStore.Height() < block.Height { // NOTE: the seenCommit is local justification to commit this block, // but may differ from the LastCommit included in the next block - precommits := cs.Votes.Precommits(cs.CommitRound) - cs.blockStore.SaveBlock(block, blockParts, precommits.MakeExtendedCommit()) + seenExtendedCommit := cs.Votes.Precommits(cs.CommitRound).MakeExtendedCommit() + if cs.state.ConsensusParams.ABCI.VoteExtensionsEnabled(block.Height) { + cs.blockStore.SaveBlockWithExtendedCommit(block, blockParts, seenExtendedCommit) + } else { + cs.blockStore.SaveBlock(block, blockParts, seenExtendedCommit.ToCommit()) + } } else { // Happens during replay if we already saved the block but didn't commit logger.Debug("calling finalizeCommit on already stored block", "height", block.Height) diff --git a/internal/evidence/pool_test.go b/internal/evidence/pool_test.go index dbf7d3862..2e1938978 100644 --- a/internal/evidence/pool_test.go +++ b/internal/evidence/pool_test.go @@ -580,7 +580,7 @@ func initializeBlockStore(db dbm.DB, state sm.State, valAddr []byte) (*store.Blo } seenCommit := makeExtCommit(i, valAddr) - blockStore.SaveBlock(block, partSet, seenCommit) + blockStore.SaveBlockWithExtendedCommit(block, partSet, seenCommit) } return blockStore, nil diff --git a/internal/state/mocks/block_store.go b/internal/state/mocks/block_store.go index 4eafb1273..58fc640fc 100644 --- a/internal/state/mocks/block_store.go +++ b/internal/state/mocks/block_store.go @@ -209,7 +209,12 @@ func (_m *BlockStore) PruneBlocks(height int64) (uint64, error) { } // SaveBlock provides a mock function with given fields: block, blockParts, seenCommit -func (_m *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) { +func (_m *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { + _m.Called(block, blockParts, seenCommit) +} + +// SaveBlockWithExtendedCommit provides a mock function with given fields: block, blockParts, seenCommit +func (_m *BlockStore) SaveBlockWithExtendedCommit(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) { _m.Called(block, blockParts, seenCommit) } diff --git a/internal/state/services.go b/internal/state/services.go index 35a91aa11..f86c4e3cf 100644 --- a/internal/state/services.go +++ b/internal/state/services.go @@ -26,7 +26,8 @@ type BlockStore interface { LoadBlockMeta(height int64) *types.BlockMeta LoadBlock(height int64) *types.Block - SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) + SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) + SaveBlockWithExtendedCommit(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) PruneBlocks(height int64) (uint64, error) diff --git a/internal/store/store.go b/internal/store/store.go index 81d79f8b7..c18a09c5c 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -2,11 +2,13 @@ package store import ( "bytes" + "errors" "fmt" "strconv" "github.com/gogo/protobuf/proto" "github.com/google/orderedcode" + db "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" @@ -466,7 +468,7 @@ func (bs *BlockStore) batchDelete( // If all the nodes restart after committing a block, // we need this to reload the precommits to catch-up nodes to the // most recent height. Otherwise they'd stall at H-1. -func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) { +func (bs *BlockStore) SaveBlockWithExtensions(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) { if block == nil { panic("BlockStore can only save a non-nil block") } @@ -539,6 +541,119 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s } } +// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db. +// blockParts: Must be parts of the block +// seenCommit: The +2/3 precommits that were seen which committed at height. +// If all the nodes restart after committing a block, +// we need this to reload the precommits to catch-up nodes to the +// most recent height. Otherwise they'd stall at H-1. +func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { + if block == nil { + panic("BlockStore can only save a non-nil block") + } + batch := bs.db.NewBatch() + if err := bs.saveBlockToBatch(batch, block, blockParts, seenCommit); err != nil { + panic(err) + } + + if err := batch.WriteSync(); err != nil { + panic(err) + } + + if err := batch.Close(); err != nil { + panic(err) + } +} + +//SaveBlockWithExtendedCommit saves the block along with the extended commit data. +func (bs *BlockStore) SaveBlockWithExtendedCommit(block *types.Block, blockParts *types.PartSet, seenCommit *types.ExtendedCommit) { + if block == nil { + panic("BlockStore can only save a non-nil block") + } + batch := bs.db.NewBatch() + if err := bs.saveBlockToBatch(batch, block, blockParts, seenCommit.ToCommit()); err != nil { + panic(err) + } + height := block.Height + + pbec := seenCommit.ToProto() + extCommitBytes := mustEncode(pbec) + if err := batch.Set(extCommitKey(height), extCommitBytes); err != nil { + panic(err) + } + + if err := batch.WriteSync(); err != nil { + panic(err) + } + + if err := batch.Close(); err != nil { + panic(err) + } +} + +func (bs *BlockStore) saveBlockToBatch(batch db.Batch, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error { + if block == nil { + panic("BlockStore can only save a non-nil block") + } + + height := block.Height + hash := block.Hash() + + if g, w := height, bs.Height()+1; bs.Base() > 0 && g != w { + return fmt.Errorf("BlockStore can only save contiguous blocks. Wanted %v, got %v", w, g) + } + if !blockParts.IsComplete() { + return errors.New("BlockStore can only save complete block part sets") + } + if height != seenCommit.Height { + return fmt.Errorf("BlockStore cannot save seen commit of a different height (block: %d, commit: %d)", height, seenCommit.Height) + } + + // Save block parts. This must be done before the block meta, since callers + // typically load the block meta first as an indication that the block exists + // and then go on to load block parts - we must make sure the block is + // complete as soon as the block meta is written. + for i := 0; i < int(blockParts.Total()); i++ { + part := blockParts.GetPart(i) + bs.saveBlockPart(height, i, part, batch) + } + + blockMeta := types.NewBlockMeta(block, blockParts) + pbm := blockMeta.ToProto() + if pbm == nil { + return errors.New("nil blockmeta") + } + + metaBytes := mustEncode(pbm) + if err := batch.Set(blockMetaKey(height), metaBytes); err != nil { + return err + } + + if err := batch.Set(blockHashKey(hash), []byte(fmt.Sprintf("%d", height))); err != nil { + return err + } + + pbc := block.LastCommit.ToProto() + blockCommitBytes := mustEncode(pbc) + if err := batch.Set(blockCommitKey(height-1), blockCommitBytes); err != nil { + return err + } + + // Save seen commit (seen +2/3 precommits for block) + pbsc := seenCommit.ToProto() + seenCommitBytes := mustEncode(pbsc) + if err := batch.Set(seenCommitKey(), seenCommitBytes); err != nil { + return err + } + + pbec := seenCommit.ToProto() + extCommitBytes := mustEncode(pbec) + if err := batch.Set(extCommitKey(height), extCommitBytes); err != nil { + return err + } + return nil +} + func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part, batch dbm.Batch) { pbp, err := part.ToProto() if err != nil { diff --git a/internal/store/store_test.go b/internal/store/store_test.go index 9c34a603a..3a8ced5e1 100644 --- a/internal/store/store_test.go +++ b/internal/store/store_test.go @@ -89,7 +89,7 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { part2 := validPartSet.GetPart(1) seenCommit := makeTestExtCommit(block.Header.Height, tmtime.Now()) - bs.SaveBlock(block, validPartSet, seenCommit) + bs.SaveBlockWithExtendedCommit(block, validPartSet, seenCommit) require.EqualValues(t, 1, bs.Base(), "expecting the new height to be changed") require.EqualValues(t, block.Header.Height, bs.Height(), "expecting the new height to be changed") @@ -209,7 +209,7 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { bs, db := newInMemoryBlockStore() // SaveBlock res, err, panicErr := doFn(func() (interface{}, error) { - bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit) + bs.SaveBlockWithExtendedCommit(tuple.block, tuple.parts, tuple.seenCommit) if tuple.block == nil { return nil, nil } @@ -293,7 +293,7 @@ func TestLoadBaseMeta(t *testing.T) { partSet, err := block.MakePartSet(2) require.NoError(t, err) seenCommit := makeTestExtCommit(h, tmtime.Now()) - bs.SaveBlock(block, partSet, seenCommit) + bs.SaveBlockWithExtendedCommit(block, partSet, seenCommit) } pruned, err := bs.PruneBlocks(4) @@ -371,7 +371,7 @@ func TestPruneBlocks(t *testing.T) { partSet, err := block.MakePartSet(2) require.NoError(t, err) seenCommit := makeTestExtCommit(h, tmtime.Now()) - bs.SaveBlock(block, partSet, seenCommit) + bs.SaveBlockWithExtendedCommit(block, partSet, seenCommit) } assert.EqualValues(t, 1, bs.Base()) @@ -479,7 +479,7 @@ func TestBlockFetchAtHeight(t *testing.T) { partSet, err := block.MakePartSet(2) require.NoError(t, err) seenCommit := makeTestExtCommit(block.Header.Height, tmtime.Now()) - bs.SaveBlock(block, partSet, seenCommit) + bs.SaveBlockWithExtendedCommit(block, partSet, seenCommit) require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") blockAtHeight := bs.LoadBlock(bs.Height()) @@ -523,7 +523,7 @@ func TestSeenAndCanonicalCommit(t *testing.T) { partSet, err := block.MakePartSet(2) require.NoError(t, err) seenCommit := makeTestExtCommit(h, tmtime.Now()) - store.SaveBlock(block, partSet, seenCommit) + store.SaveBlockWithExtendedCommit(block, partSet, seenCommit) c3 := store.LoadSeenCommit() require.NotNil(t, c3) require.Equal(t, h, c3.Height)