mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-06 05:25:35 +00:00
blockstore: fix race conditions when loading data (#5382)
Fixes #5377 and comments in #4588 (review).
This commit is contained in:
@@ -52,6 +52,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
|
||||
- [consensus] \#5329 Fix wrong proposer schedule for validators returned by `InitChain` (@erikgrinaker)
|
||||
|
||||
- [store] \#5382 Fix race conditions when loading/saving/pruning blocks (@erikgrinaker)
|
||||
|
||||
- [light] [\#5307](https://github.com/tendermint/tendermint/pull/5307) Persist correct proposer priority in light client validator sets (@cmwaters)
|
||||
|
||||
- [docker] Fix incorrect `time_iota_ms` configuration in default image (@erikgrinaker)
|
||||
|
||||
@@ -671,13 +671,14 @@ OUTER_LOOP:
|
||||
|
||||
// Catchup logic
|
||||
// If peer is lagging by more than 1, send Commit.
|
||||
if prs.Height != 0 && rs.Height >= prs.Height+2 {
|
||||
if prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= conR.conS.blockStore.Base() {
|
||||
// Load the block commit for prs.Height,
|
||||
// which contains precommit signatures for prs.Height.
|
||||
commit := conR.conS.blockStore.LoadBlockCommit(prs.Height)
|
||||
if ps.PickSendVote(commit) {
|
||||
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
|
||||
continue OUTER_LOOP
|
||||
if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil {
|
||||
if ps.PickSendVote(commit) {
|
||||
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1187,6 +1187,7 @@ func newMockBlockStore(config *cfg.Config, params tmproto.ConsensusParams) *mock
|
||||
func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
|
||||
func (bs *mockBlockStore) Base() int64 { return bs.base }
|
||||
func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 }
|
||||
func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) }
|
||||
func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] }
|
||||
func (bs *mockBlockStore) LoadBlockByHash(hash []byte) *types.Block {
|
||||
return bs.chain[int64(len(bs.chain))-1]
|
||||
|
||||
@@ -122,6 +122,7 @@ type mockBlockStore struct {
|
||||
func (mockBlockStore) Base() int64 { return 1 }
|
||||
func (store mockBlockStore) Height() int64 { return store.height }
|
||||
func (store mockBlockStore) Size() int64 { return store.height }
|
||||
func (mockBlockStore) LoadBaseMeta() *types.BlockMeta { return nil }
|
||||
func (mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { return nil }
|
||||
func (mockBlockStore) LoadBlock(height int64) *types.Block { return nil }
|
||||
func (mockBlockStore) LoadBlockByHash(hash []byte) *types.Block { return nil }
|
||||
|
||||
@@ -15,14 +15,14 @@ import (
|
||||
// More: https://docs.tendermint.com/master/rpc/#/Info/status
|
||||
func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
|
||||
var (
|
||||
earliestBlockHeight int64
|
||||
earliestBlockHash tmbytes.HexBytes
|
||||
earliestAppHash tmbytes.HexBytes
|
||||
earliestBlockTimeNano int64
|
||||
|
||||
earliestBlockHeight = env.BlockStore.Base()
|
||||
)
|
||||
|
||||
if earliestBlockMeta := env.BlockStore.LoadBlockMeta(earliestBlockHeight); earliestBlockMeta != nil {
|
||||
if earliestBlockMeta := env.BlockStore.LoadBaseMeta(); earliestBlockMeta != nil {
|
||||
earliestBlockHeight = earliestBlockMeta.Header.Height
|
||||
earliestAppHash = earliestBlockMeta.Header.AppHash
|
||||
earliestBlockHash = earliestBlockMeta.BlockID.Hash
|
||||
earliestBlockTimeNano = earliestBlockMeta.Header.Time.UnixNano()
|
||||
@@ -37,8 +37,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
|
||||
)
|
||||
|
||||
if latestHeight != 0 {
|
||||
latestBlockMeta := env.BlockStore.LoadBlockMeta(latestHeight)
|
||||
if latestBlockMeta != nil {
|
||||
if latestBlockMeta := env.BlockStore.LoadBlockMeta(latestHeight); latestBlockMeta != nil {
|
||||
latestBlockHash = latestBlockMeta.BlockID.Hash
|
||||
latestAppHash = latestBlockMeta.Header.AppHash
|
||||
latestBlockTimeNano = latestBlockMeta.Header.Time.UnixNano()
|
||||
|
||||
@@ -21,6 +21,7 @@ type BlockStore interface {
|
||||
Height() int64
|
||||
Size() int64
|
||||
|
||||
LoadBaseMeta() *types.BlockMeta
|
||||
LoadBlockMeta(height int64) *types.BlockMeta
|
||||
LoadBlock(height int64) *types.Block
|
||||
|
||||
|
||||
@@ -33,6 +33,11 @@ The store can be assumed to contain all contiguous blocks between base and heigh
|
||||
type BlockStore struct {
|
||||
db dbm.DB
|
||||
|
||||
// mtx guards access to the struct fields listed below it. We rely on the database to enforce
|
||||
// fine-grained concurrency control for its data, and thus this mutex does not apply to
|
||||
// database contents. The only reason for keeping these fields in the struct is that the data
|
||||
// can't efficiently be queried from the database since the key encoding we use is not
|
||||
// lexicographically ordered (see https://github.com/tendermint/tendermint/issues/4567).
|
||||
mtx tmsync.RWMutex
|
||||
base int64
|
||||
height int64
|
||||
@@ -73,6 +78,16 @@ func (bs *BlockStore) Size() int64 {
|
||||
return bs.height - bs.base + 1
|
||||
}
|
||||
|
||||
// LoadBase atomically loads the base block meta, or returns nil if no base is found.
|
||||
func (bs *BlockStore) LoadBaseMeta() *types.BlockMeta {
|
||||
bs.mtx.RLock()
|
||||
defer bs.mtx.RUnlock()
|
||||
if bs.base == 0 {
|
||||
return nil
|
||||
}
|
||||
return bs.LoadBlockMeta(bs.base)
|
||||
}
|
||||
|
||||
// LoadBlock returns the block with the given height.
|
||||
// If no block is found for that height, it returns nil.
|
||||
func (bs *BlockStore) LoadBlock(height int64) *types.Block {
|
||||
@@ -85,6 +100,11 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block {
|
||||
buf := []byte{}
|
||||
for i := 0; i < int(blockMeta.BlockID.PartSetHeader.Total); i++ {
|
||||
part := bs.LoadBlockPart(height, i)
|
||||
// If the part is missing (e.g. since it has been deleted after we
|
||||
// loaded the block meta) we consider the whole block to be missing.
|
||||
if part == nil {
|
||||
return nil
|
||||
}
|
||||
buf = append(buf, part.Bytes...)
|
||||
}
|
||||
err := proto.Unmarshal(buf, pbb)
|
||||
@@ -323,6 +343,15 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
|
||||
panic("BlockStore can only save complete block part sets")
|
||||
}
|
||||
|
||||
// Save block parts. This must be done before the block meta, since callers
|
||||
// typically load the block meta first as an indication that the block exists
|
||||
// and then go on to load block parts - we must make sure the block is
|
||||
// complete as soon as the block meta is written.
|
||||
for i := 0; i < int(blockParts.Total()); i++ {
|
||||
part := blockParts.GetPart(i)
|
||||
bs.saveBlockPart(height, i, part)
|
||||
}
|
||||
|
||||
// Save block meta
|
||||
blockMeta := types.NewBlockMeta(block, blockParts)
|
||||
pbm := blockMeta.ToProto()
|
||||
@@ -337,12 +366,6 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Save block parts
|
||||
for i := 0; i < int(blockParts.Total()); i++ {
|
||||
part := blockParts.GetPart(i)
|
||||
bs.saveBlockPart(height, i, part)
|
||||
}
|
||||
|
||||
// Save block commit (duplicate and separate from the Block)
|
||||
pbc := block.LastCommit.ToProto()
|
||||
blockCommitBytes := mustEncode(pbc)
|
||||
|
||||
@@ -366,6 +366,29 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadBaseMeta(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
stateStore := sm.NewStore(dbm.NewMemDB())
|
||||
state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
bs := NewBlockStore(dbm.NewMemDB())
|
||||
|
||||
for h := int64(1); h <= 10; h++ {
|
||||
block := makeBlock(h, state, new(types.Commit))
|
||||
partSet := block.MakePartSet(2)
|
||||
seenCommit := makeTestCommit(h, tmtime.Now())
|
||||
bs.SaveBlock(block, partSet, seenCommit)
|
||||
}
|
||||
|
||||
_, err = bs.PruneBlocks(4)
|
||||
require.NoError(t, err)
|
||||
|
||||
baseBlock := bs.LoadBaseMeta()
|
||||
assert.EqualValues(t, 4, baseBlock.Header.Height)
|
||||
assert.EqualValues(t, 4, bs.Base())
|
||||
}
|
||||
|
||||
func TestLoadBlockPart(t *testing.T) {
|
||||
bs, db := freshBlockStore()
|
||||
height, index := int64(10), 1
|
||||
|
||||
Reference in New Issue
Block a user