config: backport the rename of fastsync to blocksync (#9259)

This is largely a cherry pick of #6755 with some additional fixups added where detected. 
This change moves the blockchain package to a package called blocksync. Additionally, it renames the relevant uses of the term `fastsync` to `blocksync`.

closes: #9227 

#### PR checklist

- [ ] Tests written/updated, or no tests needed
- [x] `CHANGELOG_PENDING.md` updated, or no changelog entry needed
- [x] Updated relevant documentation (`docs/`) and code comments, or no
      documentation updates needed
This commit is contained in:
William Banfield
2022-08-17 11:19:20 -04:00
committed by GitHub
parent 3003e05581
commit 1069ffc6aa
31 changed files with 235 additions and 197 deletions

110
blocksync/msgs.go Normal file
View File

@@ -0,0 +1,110 @@
package blocksync
import (
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync"
"github.com/tendermint/tendermint/types"
)
const (
// NOTE: keep up to date with bcproto.BlockResponse
BlockResponseMessagePrefixSize = 4
BlockResponseMessageFieldKeySize = 1
MaxMsgSize = types.MaxBlockSizeBytes +
BlockResponseMessagePrefixSize +
BlockResponseMessageFieldKeySize
)
// EncodeMsg encodes a Protobuf message
func EncodeMsg(pb proto.Message) ([]byte, error) {
msg := bcproto.Message{}
switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}
bz, err := proto.Marshal(&msg)
if err != nil {
return nil, fmt.Errorf("unable to marshal %T: %w", pb, err)
}
return bz, nil
}
// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
return msg.StatusRequest, nil
case *bcproto.Message_StatusResponse:
return msg.StatusResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// ValidateMsg validates a message.
func ValidateMsg(pb proto.Message) error {
if pb == nil {
return errors.New("message cannot be nil")
}
switch msg := pb.(type) {
case *bcproto.BlockRequest:
if msg.Height < 0 {
return errors.New("negative Height")
}
case *bcproto.BlockResponse:
_, err := types.BlockFromProto(msg.Block)
if err != nil {
return err
}
case *bcproto.NoBlockResponse:
if msg.Height < 0 {
return errors.New("negative Height")
}
case *bcproto.StatusResponse:
if msg.Base < 0 {
return errors.New("negative Base")
}
if msg.Height < 0 {
return errors.New("negative Height")
}
if msg.Base > msg.Height {
return fmt.Errorf("base %v cannot be greater than height %v", msg.Base, msg.Height)
}
case *bcproto.StatusRequest:
return nil
default:
return fmt.Errorf("unknown message type %T", msg)
}
return nil
}

126
blocksync/msgs_test.go Normal file
View File

@@ -0,0 +1,126 @@
package blocksync_test
import (
"encoding/hex"
"math"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/blocksync"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync"
"github.com/tendermint/tendermint/types"
)
func TestBcBlockRequestMessageValidateBasic(t *testing.T) {
testCases := []struct {
testName string
requestHeight int64
expectErr bool
}{
{"Valid Request Message", 0, false},
{"Valid Request Message", 1, false},
{"Invalid Request Message", -1, true},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
request := bcproto.BlockRequest{Height: tc.requestHeight}
assert.Equal(t, tc.expectErr, blocksync.ValidateMsg(&request) != nil, "Validate Basic had an unexpected result")
})
}
}
func TestBcNoBlockResponseMessageValidateBasic(t *testing.T) {
testCases := []struct {
testName string
nonResponseHeight int64
expectErr bool
}{
{"Valid Non-Response Message", 0, false},
{"Valid Non-Response Message", 1, false},
{"Invalid Non-Response Message", -1, true},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
nonResponse := bcproto.NoBlockResponse{Height: tc.nonResponseHeight}
assert.Equal(t, tc.expectErr, blocksync.ValidateMsg(&nonResponse) != nil, "Validate Basic had an unexpected result")
})
}
}
func TestBcStatusRequestMessageValidateBasic(t *testing.T) {
request := bcproto.StatusRequest{}
assert.NoError(t, blocksync.ValidateMsg(&request))
}
func TestBcStatusResponseMessageValidateBasic(t *testing.T) {
testCases := []struct {
testName string
responseHeight int64
expectErr bool
}{
{"Valid Response Message", 0, false},
{"Valid Response Message", 1, false},
{"Invalid Response Message", -1, true},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
response := bcproto.StatusResponse{Height: tc.responseHeight}
assert.Equal(t, tc.expectErr, blocksync.ValidateMsg(&response) != nil, "Validate Basic had an unexpected result")
})
}
}
// nolint:lll // ignore line length in tests
func TestBlockchainMessageVectors(t *testing.T) {
block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil)
block.Version.Block = 11 // overwrite updated protocol version
bpb, err := block.ToProto()
require.NoError(t, err)
testCases := []struct {
testName string
bmsg proto.Message
expBytes string
}{
{"BlockRequestMessage", &bcproto.Message{Sum: &bcproto.Message_BlockRequest{
BlockRequest: &bcproto.BlockRequest{Height: 1}}}, "0a020801"},
{"BlockRequestMessage", &bcproto.Message{Sum: &bcproto.Message_BlockRequest{
BlockRequest: &bcproto.BlockRequest{Height: math.MaxInt64}}},
"0a0a08ffffffffffffffff7f"},
{"BlockResponseMessage", &bcproto.Message{Sum: &bcproto.Message_BlockResponse{
BlockResponse: &bcproto.BlockResponse{Block: bpb}}}, "1a700a6e0a5b0a02080b1803220b088092b8c398feffffff012a0212003a20c4da88e876062aa1543400d50d0eaa0dac88096057949cfb7bca7f3a48c04bf96a20e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855120d0a0b48656c6c6f20576f726c641a00"},
{"NoBlockResponseMessage", &bcproto.Message{Sum: &bcproto.Message_NoBlockResponse{
NoBlockResponse: &bcproto.NoBlockResponse{Height: 1}}}, "12020801"},
{"NoBlockResponseMessage", &bcproto.Message{Sum: &bcproto.Message_NoBlockResponse{
NoBlockResponse: &bcproto.NoBlockResponse{Height: math.MaxInt64}}},
"120a08ffffffffffffffff7f"},
{"StatusRequestMessage", &bcproto.Message{Sum: &bcproto.Message_StatusRequest{
StatusRequest: &bcproto.StatusRequest{}}},
"2200"},
{"StatusResponseMessage", &bcproto.Message{Sum: &bcproto.Message_StatusResponse{
StatusResponse: &bcproto.StatusResponse{Height: 1, Base: 2}}},
"2a0408011002"},
{"StatusResponseMessage", &bcproto.Message{Sum: &bcproto.Message_StatusResponse{
StatusResponse: &bcproto.StatusResponse{Height: math.MaxInt64, Base: math.MaxInt64}}},
"2a1408ffffffffffffffff7f10ffffffffffffffff7f"},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
bz, _ := proto.Marshal(tc.bmsg)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz))
})
}
}

647
blocksync/pool.go Normal file
View File

@@ -0,0 +1,647 @@
package blocksync
import (
"errors"
"fmt"
"math"
"sync/atomic"
"time"
flow "github.com/tendermint/tendermint/libs/flowrate"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
/*
eg, L = latency = 0.1s
P = num peers = 10
FN = num full nodes
BS = 1kB block size
CB = 1 Mbit/s = 128 kB/s
CB/P = 12.8 kB
B/S = CB/P/BS = 12.8 blocks/s
12.8 * 0.1 = 1.28 blocks on conn
*/
const (
requestIntervalMS = 2
maxTotalRequesters = 600
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at at least that rate, we
// consider them to have timedout and we disconnect.
//
// Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s,
// sending data across atlantic ~ 7.5 KB/s.
minRecvRate = 7680
// Maximum difference between current and new block's height.
maxDiffBetweenCurrentAndReceivedBlockHeight = 100
)
var peerTimeout = 15 * time.Second // not const so we can override with tests
/*
Peers self report their heights when we join the block pool.
Starting from our latest pool.height, we request blocks
in sequence from peers that reported higher heights than ours.
Every so often we ask peers what height they're on so we can keep going.
Requests are continuously made for blocks of higher heights until
the limit is reached. If most of the requests have no available peers, and we
are not at peer limits, we can probably switch to consensus reactor
*/
// BlockPool keeps track of the block sync peers, block requests and block responses.
type BlockPool struct {
service.BaseService
startTime time.Time
mtx tmsync.Mutex
// block requests
requesters map[int64]*bpRequester
height int64 // the lowest key in requesters.
// peers
peers map[p2p.ID]*bpPeer
maxPeerHeight int64 // the biggest reported height
// atomic
numPending int32 // number of requests pending assignment or block response
requestsCh chan<- BlockRequest
errorsCh chan<- peerError
}
// NewBlockPool returns a new BlockPool with the height equal to start. Block
// requests and errors will be sent to requestsCh and errorsCh accordingly.
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer),
requesters: make(map[int64]*bpRequester),
height: start,
numPending: 0,
requestsCh: requestsCh,
errorsCh: errorsCh,
}
bp.BaseService = *service.NewBaseService(nil, "BlockPool", bp)
return bp
}
// OnStart implements service.Service by spawning requesters routine and recording
// pool's start time.
func (pool *BlockPool) OnStart() error {
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
}
// spawns requesters as needed
func (pool *BlockPool) makeRequestersRoutine() {
for {
if !pool.IsRunning() {
break
}
_, numPending, lenRequesters := pool.GetStatus()
switch {
case numPending >= maxPendingRequests:
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
case lenRequesters >= maxTotalRequesters:
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
default:
// request for more blocks.
pool.makeNextRequester()
}
}
}
func (pool *BlockPool) removeTimedoutPeers() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
for _, peer := range pool.peers {
if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate
// curRate can be 0 on start
if curRate != 0 && curRate < minRecvRate {
err := errors.New("peer is not sending us data fast enough")
pool.sendError(err, peer.id)
pool.Logger.Error("SendTimeout", "peer", peer.id,
"reason", err,
"curRate", fmt.Sprintf("%d KB/s", curRate/1024),
"minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024))
peer.didTimeout = true
}
}
if peer.didTimeout {
pool.removePeer(peer.id)
}
}
}
// GetStatus returns pool's height, numPending requests and the number of
// requesters.
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}
// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
defer pool.mtx.Unlock()
// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
pool.Logger.Debug("Blockpool has no peers")
return false
}
// Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height.
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
return isCaughtUp
}
// PeekTwoBlocks returns blocks at pool.height and pool.height+1.
// We need to see the second block's Commit to validate the first block.
// So we peek two blocks at a time.
// The caller will verify the commit.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
if r := pool.requesters[pool.height]; r != nil {
first = r.getBlock()
}
if r := pool.requesters[pool.height+1]; r != nil {
second = r.getBlock()
}
return
}
// PopRequest pops the first block at pool.height.
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
func (pool *BlockPool) PopRequest() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
if r := pool.requesters[pool.height]; r != nil {
/* The block can disappear at any time, due to removePeer().
if r := pool.requesters[pool.height]; r == nil || r.block == nil {
PanicSanity("PopRequest() requires a valid block")
}
*/
if err := r.Stop(); err != nil {
pool.Logger.Error("Error stopping requester", "err", err)
}
delete(pool.requesters, pool.height)
pool.height++
} else {
panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height))
}
}
// RedoRequest invalidates the block at pool.height,
// Remove the peer and redo request from others.
// Returns the ID of the removed peer.
func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
pool.mtx.Lock()
defer pool.mtx.Unlock()
request := pool.requesters[height]
peerID := request.getPeerID()
if peerID != p2p.ID("") {
// RemovePeer will redo all requesters associated with this peer.
pool.removePeer(peerID)
}
return peerID
}
// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
requester := pool.requesters[block.Height]
if requester == nil {
pool.Logger.Info(
"peer sent us a block we didn't expect",
"peer",
peerID,
"curHeight",
pool.height,
"blockHeight",
block.Height)
diff := pool.height - block.Height
if diff < 0 {
diff *= -1
}
if diff > maxDiffBetweenCurrentAndReceivedBlockHeight {
pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
}
return
}
if requester.setBlock(block, peerID) {
atomic.AddInt32(&pool.numPending, -1)
peer := pool.peers[peerID]
if peer != nil {
peer.decrPending(blockSize)
}
} else {
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height)
pool.sendError(errors.New("invalid peer"), peerID)
}
}
// MaxPeerHeight returns the highest reported height.
func (pool *BlockPool) MaxPeerHeight() int64 {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.maxPeerHeight
}
// SetPeerRange sets the peer's alleged blockchain base and height.
func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
peer := pool.peers[peerID]
if peer != nil {
peer.base = base
peer.height = height
} else {
peer = newBPPeer(pool, peerID, base, height)
peer.setLogger(pool.Logger.With("peer", peerID))
pool.peers[peerID] = peer
}
if height > pool.maxPeerHeight {
pool.maxPeerHeight = height
}
}
// RemovePeer removes the peer with peerID from the pool. If there's no peer
// with peerID, function is a no-op.
func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.removePeer(peerID)
}
func (pool *BlockPool) removePeer(peerID p2p.ID) {
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
requester.redo(peerID)
}
}
peer, ok := pool.peers[peerID]
if ok {
if peer.timeout != nil {
peer.timeout.Stop()
}
delete(pool.peers, peerID)
// Find a new peer with the biggest height and update maxPeerHeight if the
// peer's height was the biggest.
if peer.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
}
// If no peers are left, maxPeerHeight is set to 0.
func (pool *BlockPool) updateMaxPeerHeight() {
var max int64
for _, peer := range pool.peers {
if peer.height > max {
max = peer.height
}
}
pool.maxPeerHeight = max
}
// Pick an available peer with the given height available.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()
for _, peer := range pool.peers {
if peer.didTimeout {
pool.removePeer(peer.id)
continue
}
if peer.numPending >= maxPendingRequestsPerPeer {
continue
}
if height < peer.base || height > peer.height {
continue
}
peer.incrPending()
return peer
}
return nil
}
func (pool *BlockPool) makeNextRequester() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
nextHeight := pool.height + pool.requestersLen()
if nextHeight > pool.maxPeerHeight {
return
}
request := newBPRequester(pool, nextHeight)
pool.requesters[nextHeight] = request
atomic.AddInt32(&pool.numPending, 1)
err := request.Start()
if err != nil {
request.Logger.Error("Error starting request", "err", err)
}
}
func (pool *BlockPool) requestersLen() int64 {
return int64(len(pool.requesters))
}
func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) {
if !pool.IsRunning() {
return
}
pool.requestsCh <- BlockRequest{height, peerID}
}
func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
if !pool.IsRunning() {
return
}
pool.errorsCh <- peerError{err, peerID}
}
// for debugging purposes
//nolint:unused
func (pool *BlockPool) debug() string {
pool.mtx.Lock()
defer pool.mtx.Unlock()
str := ""
nextHeight := pool.height + pool.requestersLen()
for h := pool.height; h < nextHeight; h++ {
if pool.requesters[h] == nil {
str += fmt.Sprintf("H(%v):X ", h)
} else {
str += fmt.Sprintf("H(%v):", h)
str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil)
}
}
return str
}
//-------------------------------------
type bpPeer struct {
didTimeout bool
numPending int32
height int64
base int64
pool *BlockPool
id p2p.ID
recvMonitor *flow.Monitor
timeout *time.Timer
logger log.Logger
}
func newBPPeer(pool *BlockPool, peerID p2p.ID, base int64, height int64) *bpPeer {
peer := &bpPeer{
pool: pool,
id: peerID,
base: base,
height: height,
numPending: 0,
logger: log.NewNopLogger(),
}
return peer
}
func (peer *bpPeer) setLogger(l log.Logger) {
peer.logger = l
}
func (peer *bpPeer) resetMonitor() {
peer.recvMonitor = flow.New(time.Second, time.Second*40)
initialValue := float64(minRecvRate) * math.E
peer.recvMonitor.SetREMA(initialValue)
}
func (peer *bpPeer) resetTimeout() {
if peer.timeout == nil {
peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout)
} else {
peer.timeout.Reset(peerTimeout)
}
}
func (peer *bpPeer) incrPending() {
if peer.numPending == 0 {
peer.resetMonitor()
peer.resetTimeout()
}
peer.numPending++
}
func (peer *bpPeer) decrPending(recvSize int) {
peer.numPending--
if peer.numPending == 0 {
peer.timeout.Stop()
} else {
peer.recvMonitor.Update(recvSize)
peer.resetTimeout()
}
}
func (peer *bpPeer) onTimeout() {
peer.pool.mtx.Lock()
defer peer.pool.mtx.Unlock()
err := errors.New("peer did not send us anything")
peer.pool.sendError(err, peer.id)
peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout)
peer.didTimeout = true
}
//-------------------------------------
type bpRequester struct {
service.BaseService
pool *BlockPool
height int64
gotBlockCh chan struct{}
redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat
mtx tmsync.Mutex
peerID p2p.ID
block *types.Block
}
func newBPRequester(pool *BlockPool, height int64) *bpRequester {
bpr := &bpRequester{
pool: pool,
height: height,
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan p2p.ID, 1),
peerID: "",
block: nil,
}
bpr.BaseService = *service.NewBaseService(nil, "bpRequester", bpr)
return bpr
}
func (bpr *bpRequester) OnStart() error {
go bpr.requestRoutine()
return nil
}
// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
return false
}
bpr.block = block
bpr.mtx.Unlock()
select {
case bpr.gotBlockCh <- struct{}{}:
default:
}
return true
}
func (bpr *bpRequester) getBlock() *types.Block {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.block
}
func (bpr *bpRequester) getPeerID() p2p.ID {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.peerID
}
// This is called from the requestRoutine, upon redo().
func (bpr *bpRequester) reset() {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
if bpr.block != nil {
atomic.AddInt32(&bpr.pool.numPending, 1)
}
bpr.peerID = ""
bpr.block = nil
}
// Tells bpRequester to pick another peer and try again.
// NOTE: Nonblocking, and does nothing if another redo
// was already requested.
func (bpr *bpRequester) redo(peerID p2p.ID) {
select {
case bpr.redoCh <- peerID:
default:
}
}
// Responsible for making more requests as necessary
// Returns only when a block is found (e.g. AddBlock() is called)
func (bpr *bpRequester) requestRoutine() {
OUTER_LOOP:
for {
// Pick a peer to send request to.
var peer *bpPeer
PICK_PEER_LOOP:
for {
if !bpr.IsRunning() || !bpr.pool.IsRunning() {
return
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
// log.Info("No peers available", "height", height)
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_PEER_LOOP
}
break PICK_PEER_LOOP
}
bpr.mtx.Lock()
bpr.peerID = peer.id
bpr.mtx.Unlock()
// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
WAIT_LOOP:
for {
select {
case <-bpr.pool.Quit():
if err := bpr.Stop(); err != nil {
bpr.Logger.Error("Error stopped requester", "err", err)
}
return
case <-bpr.Quit():
return
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()
continue OUTER_LOOP
} else {
continue WAIT_LOOP
}
case <-bpr.gotBlockCh:
// We got a block!
// Continue the for-loop and wait til Quit.
continue WAIT_LOOP
}
}
}
}
// BlockRequest stores a block request identified by the block Height and the PeerID responsible for
// delivering the block
type BlockRequest struct {
Height int64
PeerID p2p.ID
}

242
blocksync/pool_test.go Normal file
View File

@@ -0,0 +1,242 @@
package blocksync
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
func init() {
peerTimeout = 2 * time.Second
}
type testPeer struct {
id p2p.ID
base int64
height int64
inputChan chan inputData // make sure each peer's data is sequential
}
type inputData struct {
t *testing.T
pool *BlockPool
request BlockRequest
}
func (p testPeer) runInputRoutine() {
go func() {
for input := range p.inputChan {
p.simulateInput(input)
}
}()
}
// Request desired, pretend like we got the block immediately.
func (p testPeer) simulateInput(input inputData) {
block := &types.Block{Header: types.Header{Height: input.request.Height}}
input.pool.AddBlock(input.request.PeerID, block, 123)
// TODO: uncommenting this creates a race which is detected by:
// https://github.com/golang/go/blob/2bd767b1022dd3254bcec469f0ee164024726486/src/testing/testing.go#L854-L856
// see: https://github.com/tendermint/tendermint/issues/3390#issue-418379890
// input.t.Logf("Added block from peer %v (height: %v)", input.request.PeerID, input.request.Height)
}
type testPeers map[p2p.ID]testPeer
func (ps testPeers) start() {
for _, v := range ps {
v.runInputRoutine()
}
}
func (ps testPeers) stop() {
for _, v := range ps {
close(v.inputChan)
}
}
func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
peers := make(testPeers, numPeers)
for i := 0; i < numPeers; i++ {
peerID := p2p.ID(tmrand.Str(12))
height := minHeight + tmrand.Int63n(maxHeight-minHeight)
base := minHeight + int64(i)
if base > height {
base = height
}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10)}
}
return peers
}
func TestBlockPoolBasic(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
if err != nil {
t.Error(err)
}
t.Cleanup(func() {
if err := pool.Stop(); err != nil {
t.Error(err)
}
})
peers.start()
defer peers.stop()
// Introduce each peer.
go func() {
for _, peer := range peers {
pool.SetPeerRange(peer.id, peer.base, peer.height)
}
}()
// Start a goroutine to pull blocks
go func() {
for {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
if first != nil && second != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)
}
}
}()
// Pull from channels
for {
select {
case err := <-errorsCh:
t.Error(err)
case request := <-requestsCh:
t.Logf("Pulled new BlockRequest %v", request)
if request.Height == 300 {
return // Done!
}
peers[request.PeerID].inputChan <- inputData{t, pool, request}
}
}
}
func TestBlockPoolTimeout(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
if err != nil {
t.Error(err)
}
t.Cleanup(func() {
if err := pool.Stop(); err != nil {
t.Error(err)
}
})
for _, peer := range peers {
t.Logf("Peer %v", peer.id)
}
// Introduce each peer.
go func() {
for _, peer := range peers {
pool.SetPeerRange(peer.id, peer.base, peer.height)
}
}()
// Start a goroutine to pull blocks
go func() {
for {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
if first != nil && second != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)
}
}
}()
// Pull from channels
counter := 0
timedOut := map[p2p.ID]struct{}{}
for {
select {
case err := <-errorsCh:
t.Log(err)
// consider error to be always timeout here
if _, ok := timedOut[err.peerID]; !ok {
counter++
if counter == len(peers) {
return // Done!
}
}
case request := <-requestsCh:
t.Logf("Pulled new BlockRequest %+v", request)
}
}
}
func TestBlockPoolRemovePeer(t *testing.T) {
peers := make(testPeers, 10)
for i := 0; i < 10; i++ {
peerID := p2p.ID(fmt.Sprintf("%d", i+1))
height := int64(i + 1)
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData)}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(1, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := pool.Stop(); err != nil {
t.Error(err)
}
})
// add peers
for peerID, peer := range peers {
pool.SetPeerRange(peerID, peer.base, peer.height)
}
assert.EqualValues(t, 10, pool.MaxPeerHeight())
// remove not-existing peer
assert.NotPanics(t, func() { pool.RemovePeer(p2p.ID("Superman")) })
// remove peer with biggest height
pool.RemovePeer(p2p.ID("10"))
assert.EqualValues(t, 9, pool.MaxPeerHeight())
// remove all peers
for peerID := range peers {
pool.RemovePeer(peerID)
}
assert.EqualValues(t, 0, pool.MaxPeerHeight())
}

436
blocksync/reactor.go Normal file
View File

@@ -0,0 +1,436 @@
package blocksync
import (
"fmt"
"reflect"
"time"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
const (
// BlocksyncChannel is a channel for blocks and status updates (`BlockStore` height)
BlocksyncChannel = byte(0x40)
trySyncIntervalMS = 10
// stop syncing when last block's time is
// within this much of the system time.
// stopSyncingDurationMinutes = 10
// ask for best height every 10s
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1
)
type consensusReactor interface {
// for when we switch from blockchain reactor and block sync to
// the consensus machine
SwitchToConsensus(state sm.State, skipWAL bool)
}
type peerError struct {
err error
peerID p2p.ID
}
func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}
// Reactor handles long-term catchup syncing.
type Reactor struct {
p2p.BaseReactor
// immutable
initialState sm.State
blockExec *sm.BlockExecutor
store *store.BlockStore
pool *BlockPool
blockSync bool
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
}
// NewReactor returns new reactor instance.
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
blockSync bool) *Reactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
startHeight := store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
pool := NewBlockPool(startHeight, requestsCh, errorsCh)
bcR := &Reactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
return bcR
}
// SetLogger implements service.Service by setting the logger on reactor and pool.
func (bcR *Reactor) SetLogger(l log.Logger) {
bcR.BaseService.Logger = l
bcR.pool.Logger = l
}
// OnStart implements service.Service.
func (bcR *Reactor) OnStart() error {
if bcR.blockSync {
err := bcR.pool.Start()
if err != nil {
return err
}
go bcR.poolRoutine(false)
}
return nil
}
// SwitchToBlockSync is called by the state sync reactor when switching to block sync.
func (bcR *Reactor) SwitchToBlockSync(state sm.State) error {
bcR.blockSync = true
bcR.initialState = state
bcR.pool.height = state.LastBlockHeight + 1
err := bcR.pool.Start()
if err != nil {
return err
}
go bcR.poolRoutine(true)
return nil
}
// OnStop implements service.Service.
func (bcR *Reactor) OnStop() {
if bcR.blockSync {
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
}
}
// GetChannels implements Reactor
func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlocksyncChannel,
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: MaxMsgSize,
},
}
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
peer.Send(BlocksyncChannel, msgBytes)
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first
// bcStatusResponseMessage from the peer and call pool.SetPeerRange
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
bcR.pool.RemovePeer(peer.ID())
}
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
bl, err := block.ToProto()
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
}
return src.TrySend(BlocksyncChannel, msgBytes)
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
return src.TrySend(BlocksyncChannel, msgBytes)
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
if err = ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
switch msg := msg.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
return
}
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
case *bcproto.StatusRequest:
// Send peer our state.
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobut", "err", err)
return
}
src.TrySend(BlocksyncChannel, msgBytes)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *Reactor) poolRoutine(stateSynced bool) {
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
defer trySyncTicker.Stop()
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
defer switchToConsensusTicker.Stop()
blocksSynced := uint64(0)
chainID := bcR.initialState.ChainID
state := bcR.initialState
lastHundred := time.Now()
lastRate := 0.0
didProcessCh := make(chan struct{}, 1)
go func() {
for {
select {
case <-bcR.Quit():
return
case <-bcR.pool.Quit():
return
case request := <-bcR.requestsCh:
peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil {
continue
}
msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
continue
}
queued := peer.TrySend(BlocksyncChannel, msgBytes)
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
case err := <-bcR.errorsCh:
peer := bcR.Switch.Peers().Get(err.peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, err)
}
case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest() // nolint: errcheck
}
}
}()
FOR_LOOP:
for {
select {
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
}
// else {
// should only happen during testing
// }
break FOR_LOOP
}
case <-trySyncTicker.C: // chan time
select {
case didProcessCh <- struct{}{}:
default:
}
case <-didProcessCh:
// NOTE: It is a subtle mistake to process more than a single block
// at a time (e.g. 10) here, because we only TrySend 1 request per
// loop. The ratio mismatch can result in starving of blocks, a
// sudden burst of requests and responses, and repeat.
// Consequently, it is better to split these routines rather than
// coupling them as it's written here. TODO uncouple from request
// routine.
// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
// bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
if first == nil || second == nil {
// We need both to sync the first block.
continue FOR_LOOP
} else {
// Try again quickly next loop.
didProcessCh <- struct{}{}
}
firstParts := first.MakePartSet(types.BlockPartSizeBytes)
firstPartSetHeader := firstParts.Header()
firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := state.Validators.VerifyCommitLight(
chainID, firstID, first.Height, second.LastCommit)
if err == nil {
// validate the block before we persist it
err = bcR.blockExec.ValidateBlock(state, first)
}
if err != nil {
bcR.Logger.Error("Error in validation", "err", err)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
// NOTE: we've already removed the peer's request, but we
// still need to clean up the rest.
bcR.Switch.StopPeerForError(peer, fmt.Errorf("Reactor validation error: %v", err))
}
peerID2 := bcR.pool.RedoRequest(second.Height)
peer2 := bcR.Switch.Peers().Get(peerID2)
if peer2 != nil && peer2 != peer {
// NOTE: we've already removed the peer's request, but we
// still need to clean up the rest.
bcR.Switch.StopPeerForError(peer2, fmt.Errorf("Reactor validation error: %v", err))
}
continue FOR_LOOP
}
bcR.pool.PopRequest()
// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
blocksSynced++
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Block Sync Rate", "height", bcR.pool.height,
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}
continue FOR_LOOP
case <-bcR.Quit():
break FOR_LOOP
}
}
}
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() error {
bm, err := EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
return fmt.Errorf("could not convert msg to proto: %w", err)
}
bcR.Switch.Broadcast(BlocksyncChannel, bm)
return nil
}

328
blocksync/reactor_test.go Normal file
View File

@@ -0,0 +1,328 @@
package blocksync
import (
"fmt"
"os"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/mempool/mock"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)
var config *cfg.Config
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
validators := make([]types.GenesisValidator, numValidators)
privValidators := make([]types.PrivValidator, numValidators)
for i := 0; i < numValidators; i++ {
val, privVal := types.RandValidator(randPower, minPower)
validators[i] = types.GenesisValidator{
PubKey: val.PubKey,
Power: val.VotingPower,
}
privValidators[i] = privVal
}
sort.Sort(types.PrivValidatorsByAddress(privValidators))
return &types.GenesisDoc{
GenesisTime: tmtime.Now(),
ChainID: config.ChainID(),
Validators: validators,
}, privValidators
}
type ReactorPair struct {
reactor *Reactor
app proxy.AppConns
}
func newReactor(
logger log.Logger,
genDoc *types.GenesisDoc,
privVals []types.PrivValidator,
maxBlockHeight int64) ReactorPair {
if len(privVals) != 1 {
panic("only support one validator")
}
app := &testApp{}
cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
err := proxyApp.Start()
if err != nil {
panic(fmt.Errorf("error start app: %w", err))
}
blockDB := dbm.NewMemDB()
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(blockDB)
state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
if err != nil {
panic(fmt.Errorf("error constructing state from genesis file: %w", err))
}
// Make the Reactor itself.
// NOTE we have to create and commit the blocks first because
// pool.height is determined from the store.
fastSync := true
db := dbm.NewMemDB()
stateStore = sm.NewStore(db)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
mock.Mempool{}, sm.EmptyEvidencePool{})
if err = stateStore.Save(state); err != nil {
panic(err)
}
// let's add some blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
vote, err := types.MakeVote(
lastBlock.Header.Height,
lastBlockMeta.BlockID,
state.Validators,
privVals[0],
lastBlock.Header.ChainID,
time.Now(),
)
if err != nil {
panic(err)
}
lastCommit = types.NewCommit(vote.Height, vote.Round,
lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()})
}
thisBlock := makeBlock(blockHeight, state, lastCommit)
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(fmt.Errorf("error apply block: %w", err))
}
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
return ReactorPair{bcReactor, proxyApp}
}
func TestNoBlockResponse(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(65)
reactorPairs := make([]ReactorPair, 2)
reactorPairs[0] = newReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newReactor(log.TestingLogger(), genDoc, privVals, 0)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)
err = r.app.Stop()
require.NoError(t, err)
}
}()
tests := []struct {
height int64
existent bool
}{
{maxBlockHeight + 2, false},
{10, true},
{1, true},
{100, false},
}
for {
if reactorPairs[1].reactor.pool.IsCaughtUp() {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
for _, tt := range tests {
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
if tt.existent {
assert.True(t, block != nil)
} else {
assert.True(t, block == nil)
}
}
}
// NOTE: This is too hard to test without
// an easy way to add test peer to switch
// or without significant refactoring of the module.
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(148)
// Other chain needs a different validator set
otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30)
otherChain := newReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight)
defer func() {
err := otherChain.reactor.Stop()
require.Error(t, err)
err = otherChain.app.Stop()
require.NoError(t, err)
}()
reactorPairs := make([]ReactorPair, 4)
reactorPairs[0] = newReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[2] = newReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[3] = newReactor(log.TestingLogger(), genDoc, privVals, 0)
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)
err = r.app.Stop()
require.NoError(t, err)
}
}()
for {
time.Sleep(1 * time.Second)
caughtUp := true
for _, r := range reactorPairs {
if !r.reactor.pool.IsCaughtUp() {
caughtUp = false
}
}
if caughtUp {
break
}
}
// at this time, reactors[0-3] is the newest
assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
// Mark reactorPairs[3] as an invalid peer. Fiddling with .store without a mutex is a data
// race, but can't be easily avoided.
reactorPairs[3].reactor.store = otherChain.reactor.store
lastReactorPair := newReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
}, p2p.Connect2Switches)...)
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
}
//----------------------------------------------
// utility funcs
func makeTxs(height int64) (txs []types.Tx) {
for i := 0; i < 10; i++ {
txs = append(txs, types.Tx([]byte{byte(height), byte(i)}))
}
return txs
}
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
return block
}
type testApp struct {
abci.BaseApplication
}
var _ abci.Application = (*testApp)(nil)
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
return abci.ResponseInfo{}
}
func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
return abci.ResponseBeginBlock{}
}
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
return abci.ResponseEndBlock{}
}
func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
return abci.ResponseDeliverTx{Events: []abci.Event{}}
}
func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}
func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
return
}