add support for block pruning via ABCI Commit response (#4588)

* Added BlockStore.DeleteBlock()

* Added initial block pruner prototype

* wip

* Added BlockStore.PruneBlocks()

* Added consensus setting for block pruning

* Added BlockStore base

* Error on replay if base does not have blocks

* Handle missing blocks when sending VoteSetMaj23Message

* Error message tweak

* Properly update blockstore state

* Error message fix again

* blockchain: ignore peer missing blocks

* Added FIXME

* Added test for block replay with truncated history

* Handle peer base in blockchain reactor

* Improved replay error handling

* Added tests for Store.PruneBlocks()

* Fix non-RPC handling of truncated block history

* Panic on missing block meta in needProofBlock()

* Updated changelog

* Handle truncated block history in RPC layer

* Added info about earliest block in /status RPC

* Reorder height and base in blockchain reactor messages

* Updated changelog

* Fix tests

* Appease linter

* Minor review fixes

* Non-empty BlockStores should always have base > 0

* Update code to assume base > 0 invariant

* Added blockstore tests for pruning to 0

* Make sure we don't prune below the current base

* Added BlockStore.Size()

* config: added retain_blocks recommendations

* Update v1 blockchain reactor to handle blockstore base

* Added state database pruning

* Propagate errors on missing validator sets

* Comment tweaks

* Improved error message

Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com>

* use ABCI field ResponseCommit.retain_height instead of retain-blocks config option

* remove State.RetainHeight, return value instead

* fix minor issues

* rename pruneHeights() to pruneBlocks()

* noop to fix GitHub borkage

Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
This commit is contained in:
Erik Grinaker
2020-04-03 10:38:32 +02:00
committed by GitHub
parent ce50dda66c
commit 4298bbcc4e
42 changed files with 1208 additions and 393 deletions

View File

@@ -284,16 +284,17 @@ func (pool *BlockPool) MaxPeerHeight() int64 {
return pool.maxPeerHeight
}
// SetPeerHeight sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {
// SetPeerRange sets the peer's alleged blockchain base and height.
func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
peer := pool.peers[peerID]
if peer != nil {
peer.base = base
peer.height = height
} else {
peer = newBPPeer(pool, peerID, height)
peer = newBPPeer(pool, peerID, base, height)
peer.setLogger(pool.Logger.With("peer", peerID))
pool.peers[peerID] = peer
}
@@ -346,9 +347,9 @@ func (pool *BlockPool) updateMaxPeerHeight() {
pool.maxPeerHeight = max
}
// Pick an available peer with at least the given minHeight.
// Pick an available peer with the given height available.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(minHeight int64) *bpPeer {
func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()
@@ -360,7 +361,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int64) *bpPeer {
if peer.numPending >= maxPendingRequestsPerPeer {
continue
}
if peer.height < minHeight {
if height < peer.base || height > peer.height {
continue
}
peer.incrPending()
@@ -432,6 +433,7 @@ type bpPeer struct {
didTimeout bool
numPending int32
height int64
base int64
pool *BlockPool
id p2p.ID
recvMonitor *flow.Monitor
@@ -441,10 +443,11 @@ type bpPeer struct {
logger log.Logger
}
func newBPPeer(pool *BlockPool, peerID p2p.ID, height int64) *bpPeer {
func newBPPeer(pool *BlockPool, peerID p2p.ID, base int64, height int64) *bpPeer {
peer := &bpPeer{
pool: pool,
id: peerID,
base: base,
height: height,
numPending: 0,
logger: log.NewNopLogger(),

View File

@@ -20,6 +20,7 @@ func init() {
type testPeer struct {
id p2p.ID
base int64
height int64
inputChan chan inputData //make sure each peer's data is sequential
}
@@ -67,7 +68,11 @@ func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
for i := 0; i < numPeers; i++ {
peerID := p2p.ID(tmrand.Str(12))
height := minHeight + tmrand.Int63n(maxHeight-minHeight)
peers[peerID] = testPeer{peerID, height, make(chan inputData, 10)}
base := minHeight + int64(i)
if base > height {
base = height
}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10)}
}
return peers
}
@@ -93,7 +98,7 @@ func TestBlockPoolBasic(t *testing.T) {
// Introduce each peer.
go func() {
for _, peer := range peers {
pool.SetPeerHeight(peer.id, peer.height)
pool.SetPeerRange(peer.id, peer.base, peer.height)
}
}()
@@ -148,7 +153,7 @@ func TestBlockPoolTimeout(t *testing.T) {
// Introduce each peer.
go func() {
for _, peer := range peers {
pool.SetPeerHeight(peer.id, peer.height)
pool.SetPeerRange(peer.id, peer.base, peer.height)
}
}()
@@ -192,7 +197,7 @@ func TestBlockPoolRemovePeer(t *testing.T) {
for i := 0; i < 10; i++ {
peerID := p2p.ID(fmt.Sprintf("%d", i+1))
height := int64(i + 1)
peers[peerID] = testPeer{peerID, height, make(chan inputData)}
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData)}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
@@ -205,7 +210,7 @@ func TestBlockPoolRemovePeer(t *testing.T) {
// add peers
for peerID, peer := range peers {
pool.SetPeerHeight(peerID, peer.height)
pool.SetPeerRange(peerID, peer.base, peer.height)
}
assert.EqualValues(t, 10, pool.MaxPeerHeight())

View File

@@ -140,12 +140,15 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
})
peer.Send(BlockchainChannel, msgBytes)
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first
// bcStatusResponseMessage from the peer and call pool.SetPeerHeight
// bcStatusResponseMessage from the peer and call pool.SetPeerRange
}
// RemovePeer implements Reactor by removing peer from the pool.
@@ -155,8 +158,6 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
// According to the Tendermint spec, if all nodes are honest,
// no node should be requesting for a block that's non-existent.
func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
src p2p.Peer) (queued bool) {
@@ -196,11 +197,15 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
bcR.pool.AddBlock(src.ID(), msg.Block, len(msgBytes))
case *bcStatusRequestMessage:
// Send peer our state.
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
src.TrySend(BlockchainChannel, msgBytes)
src.TrySend(BlockchainChannel, cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
}))
case *bcStatusResponseMessage:
// Got a peer status. Unverified.
bcR.pool.SetPeerHeight(src.ID(), msg.Height)
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
case *bcNoBlockResponseMessage:
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -338,7 +343,7 @@ FOR_LOOP:
// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
var err error
state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
@@ -360,9 +365,12 @@ FOR_LOOP:
}
}
// BroadcastStatusRequest broadcasts `BlockStore` height.
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()})
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
})
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
return nil
}
@@ -446,34 +454,48 @@ func (m *bcBlockResponseMessage) String() string {
type bcStatusRequestMessage struct {
Height int64
Base int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusRequestMessage) ValidateBasic() error {
if m.Base < 0 {
return errors.New("negative Base")
}
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Base > m.Height {
return fmt.Errorf("base %v cannot be greater than height %v", m.Base, m.Height)
}
return nil
}
func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
return fmt.Sprintf("[bcStatusRequestMessage %v:%v]", m.Base, m.Height)
}
//-------------------------------------
type bcStatusResponseMessage struct {
Height int64
Base int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusResponseMessage) ValidateBasic() error {
if m.Base < 0 {
return errors.New("negative Base")
}
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Base > m.Height {
return fmt.Errorf("base %v cannot be greater than height %v", m.Base, m.Height)
}
return nil
}
func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
return fmt.Sprintf("[bcStatusResponseMessage %v:%v]", m.Base, m.Height)
}

View File

@@ -112,7 +112,7 @@ func newBlockchainReactor(
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(errors.Wrap(err, "error apply block"))
}

View File

@@ -27,6 +27,7 @@ type BpPeer struct {
logger log.Logger
ID p2p.ID
Base int64 // the peer reported base
Height int64 // the peer reported height
NumPendingBlockRequests int // number of requests still waiting for block responses
blocks map[int64]*types.Block // blocks received or expected to be received from this peer
@@ -38,14 +39,15 @@ type BpPeer struct {
}
// NewBpPeer creates a new peer.
func NewBpPeer(
peerID p2p.ID, height int64, onErr func(err error, peerID p2p.ID), params *BpPeerParams) *BpPeer {
func NewBpPeer(peerID p2p.ID, base int64, height int64,
onErr func(err error, peerID p2p.ID), params *BpPeerParams) *BpPeer {
if params == nil {
params = BpPeerDefaultParams()
}
return &BpPeer{
ID: peerID,
Base: base,
Height: height,
blocks: make(map[int64]*types.Block, maxRequestsPerPeer),
logger: log.NewNopLogger(),

View File

@@ -16,7 +16,7 @@ import (
func TestPeerMonitor(t *testing.T) {
peer := NewBpPeer(
p2p.ID(tmrand.Str(12)), 10,
p2p.ID(tmrand.Str(12)), 0, 10,
func(err error, _ p2p.ID) {},
nil)
peer.SetLogger(log.TestingLogger())
@@ -35,7 +35,7 @@ func TestPeerResetBlockResponseTimer(t *testing.T) {
params := &BpPeerParams{timeout: 2 * time.Millisecond}
peer := NewBpPeer(
p2p.ID(tmrand.Str(12)), 10,
p2p.ID(tmrand.Str(12)), 0, 10,
func(err error, _ p2p.ID) {
peerTestMtx.Lock()
defer peerTestMtx.Unlock()
@@ -75,7 +75,7 @@ func TestPeerRequestSent(t *testing.T) {
params := &BpPeerParams{timeout: 2 * time.Millisecond}
peer := NewBpPeer(
p2p.ID(tmrand.Str(12)), 10,
p2p.ID(tmrand.Str(12)), 0, 10,
func(err error, _ p2p.ID) {},
params)
@@ -94,7 +94,7 @@ func TestPeerRequestSent(t *testing.T) {
func TestPeerGetAndRemoveBlock(t *testing.T) {
peer := NewBpPeer(
p2p.ID(tmrand.Str(12)), 100,
p2p.ID(tmrand.Str(12)), 0, 100,
func(err error, _ p2p.ID) {},
nil)
@@ -142,7 +142,7 @@ func TestPeerGetAndRemoveBlock(t *testing.T) {
func TestPeerAddBlock(t *testing.T) {
peer := NewBpPeer(
p2p.ID(tmrand.Str(12)), 100,
p2p.ID(tmrand.Str(12)), 0, 100,
func(err error, _ p2p.ID) {},
nil)
@@ -189,7 +189,7 @@ func TestPeerOnErrFuncCalledDueToExpiration(t *testing.T) {
)
peer := NewBpPeer(
p2p.ID(tmrand.Str(12)), 10,
p2p.ID(tmrand.Str(12)), 0, 10,
func(err error, _ p2p.ID) {
peerTestMtx.Lock()
defer peerTestMtx.Unlock()
@@ -215,7 +215,7 @@ func TestPeerCheckRate(t *testing.T) {
minRecvRate: int64(100), // 100 bytes/sec exponential moving average
}
peer := NewBpPeer(
p2p.ID(tmrand.Str(12)), 10,
p2p.ID(tmrand.Str(12)), 0, 10,
func(err error, _ p2p.ID) {},
params)
peer.SetLogger(log.TestingLogger())
@@ -249,7 +249,7 @@ func TestPeerCleanup(t *testing.T) {
params := &BpPeerParams{timeout: 2 * time.Millisecond}
peer := NewBpPeer(
p2p.ID(tmrand.Str(12)), 10,
p2p.ID(tmrand.Str(12)), 0, 10,
func(err error, _ p2p.ID) {},
params)
peer.SetLogger(log.TestingLogger())

View File

@@ -66,9 +66,9 @@ func (pool *BlockPool) updateMaxPeerHeight() {
pool.MaxPeerHeight = newMax
}
// UpdatePeer adds a new peer or updates an existing peer with a new height.
// UpdatePeer adds a new peer or updates an existing peer with a new base and height.
// If a peer is short it is not added.
func (pool *BlockPool) UpdatePeer(peerID p2p.ID, height int64) error {
func (pool *BlockPool) UpdatePeer(peerID p2p.ID, base int64, height int64) error {
peer := pool.peers[peerID]
@@ -79,10 +79,10 @@ func (pool *BlockPool) UpdatePeer(peerID p2p.ID, height int64) error {
return errPeerTooShort
}
// Add new peer.
peer = NewBpPeer(peerID, height, pool.toBcR.sendPeerError, nil)
peer = NewBpPeer(peerID, base, height, pool.toBcR.sendPeerError, nil)
peer.SetLogger(pool.logger.With("peer", peerID))
pool.peers[peerID] = peer
pool.logger.Info("added peer", "peerID", peerID, "height", height, "num_peers", len(pool.peers))
pool.logger.Info("added peer", "peerID", peerID, "base", base, "height", height, "num_peers", len(pool.peers))
} else {
// Check if peer is lowering its height. This is not allowed.
if height < peer.Height {
@@ -90,6 +90,7 @@ func (pool *BlockPool) UpdatePeer(peerID p2p.ID, height int64) error {
return errPeerLowersItsHeight
}
// Update existing peer.
peer.Base = base
peer.Height = height
}
@@ -213,7 +214,7 @@ func (pool *BlockPool) sendRequest(height int64) bool {
if peer.NumPendingBlockRequests >= maxRequestsPerPeer {
continue
}
if peer.Height < height {
if peer.Base > height || peer.Height < height {
continue
}

View File

@@ -13,6 +13,7 @@ import (
type testPeer struct {
id p2p.ID
base int64
height int64
}
@@ -70,7 +71,7 @@ func makeBlockPool(bcr *testBcR, height int64, peers []BpPeer, blocks map[int64]
if p.Height > maxH {
maxH = p.Height
}
bPool.peers[p.ID] = NewBpPeer(p.ID, p.Height, bcr.sendPeerError, nil)
bPool.peers[p.ID] = NewBpPeer(p.ID, p.Base, p.Height, bcr.sendPeerError, nil)
bPool.peers[p.ID].SetLogger(bcr.logger)
}
@@ -93,6 +94,7 @@ func assertPeerSetsEquivalent(t *testing.T, set1 map[p2p.ID]*BpPeer, set2 map[p2
assert.NotNil(t, peer2)
assert.Equal(t, peer1.NumPendingBlockRequests, peer2.NumPendingBlockRequests)
assert.Equal(t, peer1.Height, peer2.Height)
assert.Equal(t, peer1.Base, peer2.Base)
assert.Equal(t, len(peer1.blocks), len(peer2.blocks))
for h, block1 := range peer1.blocks {
block2 := peer2.blocks[h]
@@ -123,26 +125,32 @@ func TestBlockPoolUpdatePeer(t *testing.T) {
{
name: "add a first short peer",
pool: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
args: testPeer{"P1", 50},
args: testPeer{"P1", 0, 50},
errWanted: errPeerTooShort,
poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
},
{
name: "add a first good peer",
pool: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
args: testPeer{"P1", 101},
args: testPeer{"P1", 0, 101},
poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 101}}, map[int64]tPBlocks{}),
},
{
name: "add a first good peer with base",
pool: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
args: testPeer{"P1", 10, 101},
poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Base: 10, Height: 101}}, map[int64]tPBlocks{}),
},
{
name: "increase the height of P1 from 120 to 123",
pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}),
args: testPeer{"P1", 123},
args: testPeer{"P1", 0, 123},
poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 123}}, map[int64]tPBlocks{}),
},
{
name: "decrease the height of P1 from 120 to 110",
pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}),
args: testPeer{"P1", 110},
args: testPeer{"P1", 0, 110},
errWanted: errPeerLowersItsHeight,
poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
},
@@ -151,7 +159,7 @@ func TestBlockPoolUpdatePeer(t *testing.T) {
pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 105}},
map[int64]tPBlocks{
100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true}}),
args: testPeer{"P1", 102},
args: testPeer{"P1", 0, 102},
errWanted: errPeerLowersItsHeight,
poolWanted: makeBlockPool(testBcR, 100, []BpPeer{},
map[int64]tPBlocks{}),
@@ -162,7 +170,7 @@ func TestBlockPoolUpdatePeer(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
pool := tt.pool
err := pool.UpdatePeer(tt.args.id, tt.args.height)
err := pool.UpdatePeer(tt.args.id, tt.args.base, tt.args.height)
assert.Equal(t, tt.errWanted, err)
assert.Equal(t, tt.poolWanted.blocks, tt.pool.blocks)
assertPeerSetsEquivalent(t, tt.poolWanted.peers, tt.pool.peers)
@@ -300,20 +308,34 @@ func TestBlockPoolSendRequestBatch(t *testing.T) {
testBcR := newTestBcR()
tests := []struct {
name string
pool *BlockPool
maxRequestsPerPeer int
expRequests map[int64]bool
expPeerResults []testPeerResult
expnumPendingBlockRequests int
name string
pool *BlockPool
maxRequestsPerPeer int
expRequests map[int64]bool
expRequestsSent int
expPeerResults []testPeerResult
}{
{
name: "one peer - send up to maxRequestsPerPeer block requests",
pool: makeBlockPool(testBcR, 10, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}),
maxRequestsPerPeer: 2,
expRequests: map[int64]bool{10: true, 11: true},
expPeerResults: []testPeerResult{{id: "P1", numPendingBlockRequests: 2}},
expnumPendingBlockRequests: 2,
name: "one peer - send up to maxRequestsPerPeer block requests",
pool: makeBlockPool(testBcR, 10, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}),
maxRequestsPerPeer: 2,
expRequests: map[int64]bool{10: true, 11: true},
expRequestsSent: 2,
expPeerResults: []testPeerResult{{id: "P1", numPendingBlockRequests: 2}},
},
{
name: "multiple peers - stops at gap between height and base",
pool: makeBlockPool(testBcR, 10, []BpPeer{
{ID: "P1", Base: 1, Height: 12},
{ID: "P2", Base: 15, Height: 100},
}, map[int64]tPBlocks{}),
maxRequestsPerPeer: 10,
expRequests: map[int64]bool{10: true, 11: true, 12: true},
expRequestsSent: 3,
expPeerResults: []testPeerResult{
{id: "P1", numPendingBlockRequests: 3},
{id: "P2", numPendingBlockRequests: 0},
},
},
{
name: "n peers - send n*maxRequestsPerPeer block requests",
@@ -324,10 +346,10 @@ func TestBlockPoolSendRequestBatch(t *testing.T) {
map[int64]tPBlocks{}),
maxRequestsPerPeer: 2,
expRequests: map[int64]bool{10: true, 11: true},
expRequestsSent: 4,
expPeerResults: []testPeerResult{
{id: "P1", numPendingBlockRequests: 2},
{id: "P2", numPendingBlockRequests: 2}},
expnumPendingBlockRequests: 4,
},
}
@@ -339,15 +361,13 @@ func TestBlockPoolSendRequestBatch(t *testing.T) {
var pool = tt.pool
maxRequestsPerPeer = tt.maxRequestsPerPeer
pool.MakeNextRequests(10)
assert.Equal(t, testResults.numRequestsSent, maxRequestsPerPeer*len(pool.peers))
assert.Equal(t, tt.expRequestsSent, testResults.numRequestsSent)
for _, tPeer := range tt.expPeerResults {
var peer = pool.peers[tPeer.id]
assert.NotNil(t, peer)
assert.Equal(t, tPeer.numPendingBlockRequests, peer.NumPendingBlockRequests)
}
assert.Equal(t, testResults.numRequestsSent, maxRequestsPerPeer*len(pool.peers))
})
}
}

View File

@@ -169,7 +169,10 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
})
peer.Send(BlockchainChannel, msgBytes)
// it's OK if send fails. will try later in poolRoutine
@@ -196,7 +199,10 @@ func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcBlockRequestMessage,
}
func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessage, src p2p.Peer) (queued bool) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
})
return src.TrySend(BlockchainChannel, msgBytes)
}
@@ -430,7 +436,7 @@ func (bcR *BlockchainReactor) processBlock() error {
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
bcR.state, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first)
bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first)
if err != nil {
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
@@ -441,7 +447,10 @@ func (bcR *BlockchainReactor) processBlock() error {
// Implements bcRNotifier
// sendStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) sendStatusRequest() {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()})
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
})
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
}
@@ -590,6 +599,7 @@ func (m *bcBlockResponseMessage) String() string {
type bcStatusRequestMessage struct {
Height int64
Base int64
}
// ValidateBasic performs basic validation.
@@ -597,17 +607,24 @@ func (m *bcStatusRequestMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Base < 0 {
return errors.New("negative Base")
}
if m.Base > m.Height {
return fmt.Errorf("base %v cannot be greater than height %v", m.Base, m.Height)
}
return nil
}
func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
return fmt.Sprintf("[bcStatusRequestMessage %v:%v]", m.Base, m.Height)
}
//-------------------------------------
type bcStatusResponseMessage struct {
Height int64
Base int64
}
// ValidateBasic performs basic validation.
@@ -615,9 +632,15 @@ func (m *bcStatusResponseMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Base < 0 {
return errors.New("negative Base")
}
if m.Base > m.Height {
return fmt.Errorf("base %v cannot be greater than height %v", m.Base, m.Height)
}
return nil
}
func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
return fmt.Sprintf("[bcStatusResponseMessage %v:%v]", m.Base, m.Height)
}

View File

@@ -58,6 +58,7 @@ func NewFSM(height int64, toBcR bcReactor) *BcReactorFSM {
type bReactorEventData struct {
peerID p2p.ID
err error // for peer error: timeout, slow; for processed block event if error occurred
base int64 // for status response
height int64 // for status response; for processed block event
block *types.Block // for block response
stateName string // for state timeout events
@@ -89,7 +90,7 @@ func (msg *bcReactorMessage) String() string {
case startFSMEv:
dataStr = ""
case statusResponseEv:
dataStr = fmt.Sprintf("peer=%v height=%v", msg.data.peerID, msg.data.height)
dataStr = fmt.Sprintf("peer=%v base=%v height=%v", msg.data.peerID, msg.data.base, msg.data.height)
case blockResponseEv:
dataStr = fmt.Sprintf("peer=%v block.height=%v length=%v",
msg.data.peerID, msg.data.block.Height, msg.data.length)
@@ -213,7 +214,7 @@ func init() {
return finished, errNoTallerPeer
case statusResponseEv:
if err := fsm.pool.UpdatePeer(data.peerID, data.height); err != nil {
if err := fsm.pool.UpdatePeer(data.peerID, data.base, data.height); err != nil {
if fsm.pool.NumPeers() == 0 {
return waitForPeer, err
}
@@ -246,7 +247,7 @@ func init() {
switch ev {
case statusResponseEv:
err := fsm.pool.UpdatePeer(data.peerID, data.height)
err := fsm.pool.UpdatePeer(data.peerID, data.base, data.height)
if fsm.pool.NumPeers() == 0 {
return waitForPeer, err
}

View File

@@ -131,7 +131,7 @@ func newBlockchainReactor(
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(errors.Wrap(err, "error apply block"))
}

View File

@@ -14,7 +14,7 @@ type iIO interface {
sendBlockNotFound(height int64, peerID p2p.ID) error
sendStatusResponse(height int64, peerID p2p.ID) error
broadcastStatusRequest(height int64)
broadcastStatusRequest(base int64, height int64)
trySwitchToConsensus(state state.State, blocksSynced int)
}
@@ -104,8 +104,11 @@ func (sio *switchIO) trySwitchToConsensus(state state.State, blocksSynced int) {
}
}
func (sio *switchIO) broadcastStatusRequest(height int64) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{height})
func (sio *switchIO) broadcastStatusRequest(base int64, height int64) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{
Base: base,
Height: height,
})
// XXX: maybe we should use an io specific peer list here
sio.sw.Broadcast(BlockchainChannel, msgBytes)
}

View File

@@ -29,7 +29,7 @@ func newProcessorContext(st blockStore, ex blockApplier, s state.State) *pContex
}
func (pc *pContext) applyBlock(blockID types.BlockID, block *types.Block) error {
newState, err := pc.applier.ApplyBlock(pc.state, blockID, block)
newState, _, err := pc.applier.ApplyBlock(pc.state, blockID, block)
pc.state = newState
return err
}

View File

@@ -72,41 +72,56 @@ func (m *bcBlockResponseMessage) String() string {
type bcStatusRequestMessage struct {
Height int64
Base int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusRequestMessage) ValidateBasic() error {
if m.Base < 0 {
return errors.New("negative Base")
}
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Base > m.Height {
return fmt.Errorf("base %v cannot be greater than height %v", m.Base, m.Height)
}
return nil
}
func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
return fmt.Sprintf("[bcStatusRequestMessage %v:%v]", m.Base, m.Height)
}
//-------------------------------------
type bcStatusResponseMessage struct {
Height int64
Base int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusResponseMessage) ValidateBasic() error {
if m.Base < 0 {
return errors.New("negative Base")
}
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Base > m.Height {
return fmt.Errorf("base %v cannot be greater than height %v", m.Base, m.Height)
}
return nil
}
func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
return fmt.Sprintf("[bcStatusResponseMessage %v:%v]", m.Base, m.Height)
}
type blockStore interface {
LoadBlock(height int64) *types.Block
SaveBlock(*types.Block, *types.PartSet, *types.Commit)
Base() int64
Height() int64
}
@@ -136,7 +151,7 @@ type blockVerifier interface {
//nolint:deadcode
type blockApplier interface {
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error)
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, int64, error)
}
// XXX: unify naming in this package around tmState
@@ -266,6 +281,7 @@ type bcStatusResponse struct {
priorityNormal
time time.Time
peerID p2p.ID
base int64
height int64
}
@@ -337,7 +353,7 @@ func (r *BlockchainReactor) demux() {
case <-doProcessBlockCh:
r.processor.send(rProcessBlock{})
case <-doStatusCh:
r.io.broadcastStatusRequest(r.SyncHeight())
r.io.broadcastStatusRequest(r.store.Base(), r.SyncHeight())
// Events from peers
case event := <-r.events:
@@ -483,7 +499,7 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
case *bcStatusResponseMessage:
r.events <- bcStatusResponse{peerID: src.ID(), height: msg.Height}
r.events <- bcStatusResponse{peerID: src.ID(), base: msg.Base, height: msg.Height}
case *bcBlockResponseMessage:
r.events <- bcBlockResponse{

View File

@@ -77,9 +77,9 @@ type mockBlockApplier struct {
}
// XXX: Add whitelist/blacklist?
func (mba *mockBlockApplier) ApplyBlock(state sm.State, blockID types.BlockID, block *types.Block) (sm.State, error) {
func (mba *mockBlockApplier) ApplyBlock(state sm.State, blockID types.BlockID, block *types.Block) (sm.State, int64, error) {
state.LastBlockHeight++
return state, nil
return state, 0, nil
}
type mockSwitchIo struct {
@@ -127,7 +127,7 @@ func (sio *mockSwitchIo) hasSwitchedToConsensus() bool {
return sio.switchedToConsensus
}
func (sio *mockSwitchIo) broadcastStatusRequest(height int64) {
func (sio *mockSwitchIo) broadcastStatusRequest(base int64, height int64) {
}
type testReactorParams struct {
@@ -511,7 +511,7 @@ func newReactorStore(
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(errors.Wrap(err, "error apply block"))
}

View File

@@ -111,20 +111,22 @@ type scPeer struct {
// updated to Removed when peer is removed
state peerState
base int64 // updated when statusResponse is received
height int64 // updated when statusResponse is received
lastTouched time.Time
lastRate int64 // last receive rate in bytes
}
func (p scPeer) String() string {
return fmt.Sprintf("{state %v, height %d, lastTouched %v, lastRate %d, id %v}",
p.state, p.height, p.lastTouched, p.lastRate, p.peerID)
return fmt.Sprintf("{state %v, base %d, height %d, lastTouched %v, lastRate %d, id %v}",
p.state, p.base, p.height, p.lastTouched, p.lastRate, p.peerID)
}
func newScPeer(peerID p2p.ID) *scPeer {
return &scPeer{
peerID: peerID,
state: peerStateNew,
base: -1,
height: -1,
lastTouched: time.Time{},
}
@@ -280,7 +282,7 @@ func (sc *scheduler) addNewBlocks() {
}
}
func (sc *scheduler) setPeerHeight(peerID p2p.ID, height int64) error {
func (sc *scheduler) setPeerRange(peerID p2p.ID, base int64, height int64) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("cannot find peer %s", peerID)
@@ -295,6 +297,11 @@ func (sc *scheduler) setPeerHeight(peerID p2p.ID, height int64) error {
return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height)
}
if base > height {
return fmt.Errorf("cannot set peer base higher than its height")
}
peer.base = base
peer.height = height
peer.state = peerStateReady
@@ -312,13 +319,13 @@ func (sc *scheduler) getStateAtHeight(height int64) blockState {
}
}
func (sc *scheduler) getPeersAtHeightOrAbove(height int64) []p2p.ID {
func (sc *scheduler) getPeersWithHeight(height int64) []p2p.ID {
peers := make([]p2p.ID, 0)
for _, peer := range sc.peers {
if peer.state != peerStateReady {
continue
}
if peer.height >= height {
if peer.base <= height && peer.height >= height {
peers = append(peers, peer.peerID)
}
}
@@ -395,6 +402,11 @@ func (sc *scheduler) markPending(peerID p2p.ID, height int64, time time.Time) er
height, peerID, peer.height)
}
if height < peer.base {
return fmt.Errorf("cannot request height %d for peer %s with base %d",
height, peerID, peer.base)
}
sc.setStateAtHeight(height, blockStatePending)
sc.pendingBlocks[height] = peerID
sc.pendingTime[height] = time
@@ -463,7 +475,7 @@ func (sc *scheduler) pendingFrom(peerID p2p.ID) []int64 {
}
func (sc *scheduler) selectPeer(height int64) (p2p.ID, error) {
peers := sc.getPeersAtHeightOrAbove(height)
peers := sc.getPeersWithHeight(height)
if len(peers) == 0 {
return "", fmt.Errorf("cannot find peer for height %d", height)
}
@@ -535,8 +547,8 @@ func (sc *scheduler) handleNoBlockResponse(event bcNoBlockResponse) (Event, erro
_ = sc.removePeer(event.peerID)
return scPeerError{peerID: event.peerID,
reason: fmt.Errorf("peer %v with height %d claims no block for %d",
event.peerID, peer.height, event.height)}, nil
reason: fmt.Errorf("peer %v with base %d height %d claims no block for %d",
event.peerID, peer.base, peer.height, event.height)}, nil
}
func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) {
@@ -653,7 +665,7 @@ func (sc *scheduler) handleTrySchedule(event rTrySchedule) (Event, error) {
}
func (sc *scheduler) handleStatusResponse(event bcStatusResponse) (Event, error) {
err := sc.setPeerHeight(event.peerID, event.height)
err := sc.setPeerRange(event.peerID, event.base, event.height)
if err != nil {
return scPeerError{peerID: event.peerID, reason: err}, nil
}

View File

@@ -145,8 +145,8 @@ func TestScMaxHeights(t *testing.T) {
sc: scheduler{
height: 1,
peers: map[p2p.ID]*scPeer{
"P1": {height: -1, state: peerStateNew},
"P2": {height: -1, state: peerStateNew}},
"P1": {base: -1, height: -1, state: peerStateNew},
"P2": {base: -1, height: -1, state: peerStateNew}},
},
wantMax: 0,
},
@@ -194,15 +194,15 @@ func TestScAddPeer(t *testing.T) {
name: "add first peer",
fields: scTestParams{},
args: args{peerID: "P1"},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {base: -1, height: -1, state: peerStateNew}}},
},
{
name: "add second peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}},
fields: scTestParams{peers: map[string]*scPeer{"P1": {base: -1, height: -1, state: peerStateNew}}},
args: args{peerID: "P2"},
wantFields: scTestParams{peers: map[string]*scPeer{
"P1": {height: -1, state: peerStateNew},
"P2": {height: -1, state: peerStateNew}}},
"P1": {base: -1, height: -1, state: peerStateNew},
"P2": {base: -1, height: -1, state: peerStateNew}}},
},
{
name: "attempt to add duplicate peer",
@@ -501,10 +501,11 @@ func TestScRemovePeer(t *testing.T) {
}
}
func TestScSetPeerHeight(t *testing.T) {
func TestScSetPeerRange(t *testing.T) {
type args struct {
peerID p2p.ID
base int64
height int64
}
tests := []struct {
@@ -576,13 +577,37 @@ func TestScSetPeerHeight(t *testing.T) {
peers: map[string]*scPeer{"P2": {height: 10000000000, state: peerStateReady}},
allB: []int64{1, 2, 3, 4}},
},
{
name: "add peer with base > height should error",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}},
allB: []int64{1, 2, 3, 4}},
args: args{peerID: "P1", base: 6, height: 5},
wantFields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}},
allB: []int64{1, 2, 3, 4}},
wantErr: true,
},
{
name: "add peer with base == height is fine",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 4, state: peerStateNew}},
targetPending: 4,
},
args: args{peerID: "P1", base: 6, height: 6},
wantFields: scTestParams{
targetPending: 4,
peers: map[string]*scPeer{"P1": {base: 6, height: 6, state: peerStateReady}},
allB: []int64{1, 2, 3, 4}},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
if err := sc.setPeerHeight(tt.args.peerID, tt.args.height); (err != nil) != tt.wantErr {
err := sc.setPeerRange(tt.args.peerID, tt.args.base, tt.args.height)
if (err != nil) != tt.wantErr {
t.Errorf("setPeerHeight() wantErr %v, error = %v", tt.wantErr, err)
}
wantSc := newTestScheduler(tt.wantFields)
@@ -591,7 +616,7 @@ func TestScSetPeerHeight(t *testing.T) {
}
}
func TestScGetPeersAtHeight(t *testing.T) {
func TestScGetPeersWithHeight(t *testing.T) {
type args struct {
height int64
@@ -648,6 +673,26 @@ func TestScGetPeersAtHeight(t *testing.T) {
args: args{height: 4},
wantResult: []p2p.ID{"P1"},
},
{
name: "one Ready higher peer at base",
fields: scTestParams{
targetPending: 4,
peers: map[string]*scPeer{"P1": {base: 4, height: 20, state: peerStateReady}},
allB: []int64{1, 2, 3, 4},
},
args: args{height: 4},
wantResult: []p2p.ID{"P1"},
},
{
name: "one Ready higher peer with higher base",
fields: scTestParams{
targetPending: 4,
peers: map[string]*scPeer{"P1": {base: 10, height: 20, state: peerStateReady}},
allB: []int64{1, 2, 3, 4},
},
args: args{height: 4},
wantResult: []p2p.ID{},
},
{
name: "multiple mixed peers",
fields: scTestParams{
@@ -669,9 +714,9 @@ func TestScGetPeersAtHeight(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
// getPeersAtHeight should not mutate the scheduler
// getPeersWithHeight should not mutate the scheduler
wantSc := sc
res := sc.getPeersAtHeightOrAbove(tt.args.height)
res := sc.getPeersWithHeight(tt.args.height)
sort.Sort(PeerByID(res))
assert.Equal(t, tt.wantResult, res)
assert.Equal(t, wantSc, sc)
@@ -695,7 +740,7 @@ func TestScMarkPending(t *testing.T) {
wantErr bool
}{
{
name: "attempt mark pending an unknown block",
name: "attempt mark pending an unknown block above height",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}},
allB: []int64{1, 2}},
@@ -705,6 +750,17 @@ func TestScMarkPending(t *testing.T) {
allB: []int64{1, 2}},
wantErr: true,
},
{
name: "attempt mark pending an unknown block below base",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {base: 4, height: 6, state: peerStateReady}},
allB: []int64{1, 2, 3, 4, 5, 6}},
args: args{peerID: "P1", height: 3, tm: now},
wantFields: scTestParams{
peers: map[string]*scPeer{"P1": {base: 4, height: 6, state: peerStateReady}},
allB: []int64{1, 2, 3, 4, 5, 6}},
wantErr: true,
},
{
name: "attempt mark pending from non existing peer",
fields: scTestParams{
@@ -1202,6 +1258,16 @@ func TestScSelectPeer(t *testing.T) {
args: args{height: 4},
wantResult: "P1",
},
{
name: "one Ready higher peer with higher base",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {base: 4, height: 6, state: peerStateReady}},
allB: []int64{1, 2, 3, 4, 5, 6},
},
args: args{height: 3},
wantResult: "",
wantError: true,
},
{
name: "many Ready higher peers with different number of pending requests",
fields: scTestParams{
@@ -1990,7 +2056,7 @@ func TestScHandle(t *testing.T) {
args: args{event: bcAddNewPeer{peerID: "P1"}},
wantEvent: noOpEvent{},
wantSc: &scTestParams{startTime: now, peers: map[string]*scPeer{
"P1": {height: -1, state: peerStateNew}}, height: 1},
"P1": {base: -1, height: -1, state: peerStateNew}}, height: 1},
},
{ // set height of P1
args: args{event: bcStatusResponse{peerID: "P1", time: tick[0], height: 3}},