mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-12 23:01:30 +00:00
Compare commits
42 Commits
dependabot
...
bez/p2p-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b94ca8308 | ||
|
|
dea48ca7e0 | ||
|
|
630307096b | ||
|
|
dd15fbd502 | ||
|
|
de2ee57b16 | ||
|
|
0ffbafac0a | ||
|
|
5dd1da9ebf | ||
|
|
c5a126fe9b | ||
|
|
dffc29a5b2 | ||
|
|
143fa93d76 | ||
|
|
2201204418 | ||
|
|
7aff59cd86 | ||
|
|
a7c624eb2d | ||
|
|
22536ba3cf | ||
|
|
0b36cc01e1 | ||
|
|
1f88c2a31c | ||
|
|
4a596bfd72 | ||
|
|
96bc96aa40 | ||
|
|
2e49deba49 | ||
|
|
67c5516810 | ||
|
|
4b09bf9695 | ||
|
|
728c1366cc | ||
|
|
42049faed6 | ||
|
|
95b08bb775 | ||
|
|
3ac6a6921e | ||
|
|
1fcc3973a0 | ||
|
|
31fac447a4 | ||
|
|
88fa8fe348 | ||
|
|
9039b6cd05 | ||
|
|
bd42353ee5 | ||
|
|
1463bc0be6 | ||
|
|
f021c7df4d | ||
|
|
85569ebb60 | ||
|
|
952cfaa174 | ||
|
|
537464d26a | ||
|
|
faa0e09f17 | ||
|
|
7a639e5297 | ||
|
|
c978c8f4ee | ||
|
|
bbf126a79d | ||
|
|
a7f935c93a | ||
|
|
c111125b9f | ||
|
|
b48a7f4d7e |
@@ -1,12 +1,6 @@
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -18,91 +12,3 @@ const (
|
||||
BlockResponseMessagePrefixSize +
|
||||
BlockResponseMessageFieldKeySize
|
||||
)
|
||||
|
||||
// EncodeMsg encodes a Protobuf message
|
||||
func EncodeMsg(pb proto.Message) ([]byte, error) {
|
||||
msg := bcproto.Message{}
|
||||
|
||||
switch pb := pb.(type) {
|
||||
case *bcproto.BlockRequest:
|
||||
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
|
||||
case *bcproto.BlockResponse:
|
||||
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
|
||||
case *bcproto.NoBlockResponse:
|
||||
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
|
||||
case *bcproto.StatusRequest:
|
||||
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
|
||||
case *bcproto.StatusResponse:
|
||||
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message type %T", pb)
|
||||
}
|
||||
|
||||
bz, err := proto.Marshal(&msg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to marshal %T: %w", pb, err)
|
||||
}
|
||||
|
||||
return bz, nil
|
||||
}
|
||||
|
||||
// DecodeMsg decodes a Protobuf message.
|
||||
func DecodeMsg(bz []byte) (proto.Message, error) {
|
||||
pb := &bcproto.Message{}
|
||||
|
||||
err := proto.Unmarshal(bz, pb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch msg := pb.Sum.(type) {
|
||||
case *bcproto.Message_BlockRequest:
|
||||
return msg.BlockRequest, nil
|
||||
case *bcproto.Message_BlockResponse:
|
||||
return msg.BlockResponse, nil
|
||||
case *bcproto.Message_NoBlockResponse:
|
||||
return msg.NoBlockResponse, nil
|
||||
case *bcproto.Message_StatusRequest:
|
||||
return msg.StatusRequest, nil
|
||||
case *bcproto.Message_StatusResponse:
|
||||
return msg.StatusResponse, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message type %T", msg)
|
||||
}
|
||||
}
|
||||
|
||||
// ValidateMsg validates a message.
|
||||
func ValidateMsg(pb proto.Message) error {
|
||||
if pb == nil {
|
||||
return errors.New("message cannot be nil")
|
||||
}
|
||||
|
||||
switch msg := pb.(type) {
|
||||
case *bcproto.BlockRequest:
|
||||
if msg.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
case *bcproto.BlockResponse:
|
||||
// validate basic is called later when converting from proto
|
||||
return nil
|
||||
case *bcproto.NoBlockResponse:
|
||||
if msg.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
case *bcproto.StatusResponse:
|
||||
if msg.Base < 0 {
|
||||
return errors.New("negative Base")
|
||||
}
|
||||
if msg.Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
if msg.Base > msg.Height {
|
||||
return fmt.Errorf("base %v cannot be greater than height %v", msg.Base, msg.Height)
|
||||
}
|
||||
case *bcproto.StatusRequest:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("unknown message type %T", msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ eg, L = latency = 0.1s
|
||||
const (
|
||||
requestIntervalMS = 2
|
||||
maxTotalRequesters = 600
|
||||
maxPeerErrBuffer = 1000
|
||||
maxPendingRequests = maxTotalRequesters
|
||||
maxPendingRequestsPerPeer = 20
|
||||
|
||||
@@ -180,6 +181,7 @@ func (pool *BlockPool) IsCaughtUp() bool {
|
||||
if len(pool.peers) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// NOTE: we use maxPeerHeight - 1 because to sync block H requires block H+1
|
||||
// to verify the LastCommit.
|
||||
return pool.height >= (pool.maxPeerHeight - 1)
|
||||
|
||||
@@ -2,11 +2,13 @@ package v0
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
@@ -14,18 +16,39 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
_ service.Service = (*Reactor)(nil)
|
||||
|
||||
// ChannelShims contains a map of ChannelDescriptorShim objects, where each
|
||||
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
|
||||
// p2p proto.Message the new p2p Channel is responsible for handling.
|
||||
//
|
||||
//
|
||||
// TODO: Remove once p2p refactor is complete.
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5670
|
||||
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
|
||||
BlockchainChannel: {
|
||||
MsgType: new(bcproto.Message),
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(BlockchainChannel),
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 1000,
|
||||
RecvBufferCapacity: 50 * 4096,
|
||||
RecvMessageCapacity: bc.MaxMsgSize,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
|
||||
BlockchainChannel = byte(0x40)
|
||||
// BlockchainChannel is a channel for blocks and status updates
|
||||
BlockchainChannel = p2p.ChannelID(0x40)
|
||||
|
||||
trySyncIntervalMS = 10
|
||||
|
||||
// stop syncing when last block's time is
|
||||
// within this much of the system time.
|
||||
// stopSyncingDurationMinutes = 10
|
||||
|
||||
// ask for best height every 10s
|
||||
statusUpdateIntervalSeconds = 10
|
||||
|
||||
// check if we should switch to consensus reactor
|
||||
switchToConsensusIntervalSeconds = 1
|
||||
|
||||
@@ -34,8 +57,8 @@ const (
|
||||
)
|
||||
|
||||
type consensusReactor interface {
|
||||
// for when we switch from blockchain reactor and fast sync to
|
||||
// the consensus machine
|
||||
// For when we switch from blockchain reactor and fast sync to the consensus
|
||||
// machine.
|
||||
SwitchToConsensus(state sm.State, skipWAL bool)
|
||||
}
|
||||
|
||||
@@ -49,339 +72,447 @@ func (e peerError) Error() string {
|
||||
}
|
||||
|
||||
// BlockchainReactor handles long-term catchup syncing.
|
||||
type BlockchainReactor struct {
|
||||
p2p.BaseReactor
|
||||
type Reactor struct {
|
||||
service.BaseService
|
||||
|
||||
// immutable
|
||||
initialState sm.State
|
||||
|
||||
blockExec *sm.BlockExecutor
|
||||
store *store.BlockStore
|
||||
pool *BlockPool
|
||||
fastSync bool
|
||||
blockExec *sm.BlockExecutor
|
||||
store *store.BlockStore
|
||||
pool *BlockPool
|
||||
consReactor consensusReactor
|
||||
fastSync bool
|
||||
|
||||
blockchainCh *p2p.Channel
|
||||
peerUpdates *p2p.PeerUpdatesCh
|
||||
closeCh chan struct{}
|
||||
|
||||
requestsCh <-chan BlockRequest
|
||||
errorsCh <-chan peerError
|
||||
|
||||
// poolWG is used to synchronize the graceful shutdown of the poolRoutine and
|
||||
// requestRoutine spawned goroutines when stopping the reactor and before
|
||||
// stopping the p2p Channel(s).
|
||||
poolWG sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewBlockchainReactor returns new reactor instance.
|
||||
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
|
||||
fastSync bool) *BlockchainReactor {
|
||||
|
||||
// NewReactor returns new reactor instance.
|
||||
func NewReactor(
|
||||
logger log.Logger,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
store *store.BlockStore,
|
||||
consReactor consensusReactor,
|
||||
blockchainCh *p2p.Channel,
|
||||
peerUpdates *p2p.PeerUpdatesCh,
|
||||
fastSync bool,
|
||||
) *Reactor {
|
||||
if state.LastBlockHeight != store.Height() {
|
||||
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
|
||||
store.Height()))
|
||||
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
|
||||
}
|
||||
|
||||
requestsCh := make(chan BlockRequest, maxTotalRequesters)
|
||||
|
||||
const capacity = 1000 // must be bigger than peers count
|
||||
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
|
||||
|
||||
startHeight := store.Height() + 1
|
||||
if startHeight == 1 {
|
||||
startHeight = state.InitialHeight
|
||||
}
|
||||
pool := NewBlockPool(startHeight, requestsCh, errorsCh)
|
||||
|
||||
bcR := &BlockchainReactor{
|
||||
requestsCh := make(chan BlockRequest, maxTotalRequesters)
|
||||
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
|
||||
|
||||
r := &Reactor{
|
||||
initialState: state,
|
||||
blockExec: blockExec,
|
||||
store: store,
|
||||
pool: pool,
|
||||
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
|
||||
consReactor: consReactor,
|
||||
fastSync: fastSync,
|
||||
requestsCh: requestsCh,
|
||||
errorsCh: errorsCh,
|
||||
blockchainCh: blockchainCh,
|
||||
peerUpdates: peerUpdates,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
|
||||
return bcR
|
||||
|
||||
r.BaseService = *service.NewBaseService(logger, "Blockchain", r)
|
||||
return r
|
||||
}
|
||||
|
||||
// SetLogger implements service.Service by setting the logger on reactor and pool.
|
||||
func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
|
||||
bcR.BaseService.Logger = l
|
||||
bcR.pool.Logger = l
|
||||
}
|
||||
|
||||
// OnStart implements service.Service.
|
||||
func (bcR *BlockchainReactor) OnStart() error {
|
||||
if bcR.fastSync {
|
||||
err := bcR.pool.Start()
|
||||
if err != nil {
|
||||
// OnStart starts separate go routines for each p2p Channel and listens for
|
||||
// envelopes on each. In addition, it also listens for peer updates and handles
|
||||
// messages on that p2p channel accordingly. The caller must be sure to execute
|
||||
// OnStop to ensure the outbound p2p Channels are closed.
|
||||
//
|
||||
// If fastSync is enabled, we also start the pool and the pool processing
|
||||
// goroutine. If the pool fails to start, an error is returned.
|
||||
func (r *Reactor) OnStart() error {
|
||||
if r.fastSync {
|
||||
if err := r.pool.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
go bcR.poolRoutine(false)
|
||||
|
||||
go r.poolRoutine(false)
|
||||
}
|
||||
|
||||
go r.processBlockchainCh()
|
||||
go r.processPeerUpdates()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SwitchToFastSync is called by the state sync reactor when switching to fast sync.
|
||||
func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error {
|
||||
bcR.fastSync = true
|
||||
bcR.initialState = state
|
||||
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
|
||||
// blocking until they all exit.
|
||||
func (r *Reactor) OnStop() {
|
||||
if r.fastSync {
|
||||
if err := r.pool.Stop(); err != nil {
|
||||
r.Logger.Error("failed to stop pool", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
bcR.pool.height = state.LastBlockHeight + 1
|
||||
err := bcR.pool.Start()
|
||||
if err != nil {
|
||||
// wait for the poolRoutine and requestRoutine goroutines to gracefully exit
|
||||
r.poolWG.Wait()
|
||||
|
||||
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
|
||||
// p2p Channels should execute Close().
|
||||
close(r.closeCh)
|
||||
|
||||
// Wait for all p2p Channels to be closed before returning. This ensures we
|
||||
// can easily reason about synchronization of all p2p Channels and ensure no
|
||||
// panics will occur.
|
||||
<-r.blockchainCh.Done()
|
||||
<-r.peerUpdates.Done()
|
||||
}
|
||||
|
||||
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
|
||||
// Otherwise, we'll respond saying we do not have it.
|
||||
func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID p2p.NodeID) {
|
||||
block := r.store.LoadBlock(msg.Height)
|
||||
if block != nil {
|
||||
blockProto, err := block.ToProto()
|
||||
if err != nil {
|
||||
r.Logger.Error("failed to convert msg to protobuf", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
r.blockchainCh.Out() <- p2p.Envelope{
|
||||
To: peerID,
|
||||
Message: &bcproto.BlockResponse{Block: blockProto},
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
|
||||
r.blockchainCh.Out() <- p2p.Envelope{
|
||||
To: peerID,
|
||||
Message: &bcproto.NoBlockResponse{Height: msg.Height},
|
||||
}
|
||||
}
|
||||
|
||||
// handleBlockchainMessage handles enevelopes sent from peers on the
|
||||
// BlockchainChannel. It returns an error only if the Envelope.Message is unknown
|
||||
// for this channel. This should never be called outside of handleMessage.
|
||||
func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *bcproto.BlockRequest:
|
||||
r.respondToPeer(msg, envelope.From)
|
||||
|
||||
case *bcproto.BlockResponse:
|
||||
block, err := types.BlockFromProto(msg.Block)
|
||||
if err != nil {
|
||||
r.Logger.Error("failed to convert block from proto", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
protoMsg := new(bcproto.Message)
|
||||
if err := protoMsg.Wrap(msg); err != nil {
|
||||
r.Logger.Error("failed to wrap proto message", "err", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
bz, err := proto.Marshal(protoMsg)
|
||||
if err != nil {
|
||||
r.Logger.Error("failed to proto encode message", "err", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
r.pool.AddBlock(envelope.From, block, len(bz))
|
||||
|
||||
case *bcproto.StatusRequest:
|
||||
r.blockchainCh.Out() <- p2p.Envelope{
|
||||
To: envelope.From,
|
||||
Message: &bcproto.StatusResponse{
|
||||
Height: r.store.Height(),
|
||||
Base: r.store.Base(),
|
||||
},
|
||||
}
|
||||
|
||||
case *bcproto.StatusResponse:
|
||||
r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height)
|
||||
|
||||
case *bcproto.NoBlockResponse:
|
||||
r.Logger.Debug(
|
||||
"peer does not have the requested block",
|
||||
"height", msg.Height,
|
||||
"peer", envelope.From,
|
||||
)
|
||||
|
||||
default:
|
||||
r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From)
|
||||
return fmt.Errorf("received unknown message: %T", msg)
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
r.Logger.Error("recovering from processing message panic", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
|
||||
|
||||
switch chID {
|
||||
case BlockchainChannel:
|
||||
err = r.handleBlockchainMessage(envelope)
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// processBlockchainCh initiates a blocking process where we listen for and handle
|
||||
// envelopes on the BlockchainChannel. Any error encountered during message
|
||||
// execution will result in a PeerError being sent on the BlockchainChannel. When
|
||||
// the reactor is stopped, we will catch the signal and close the p2p Channel
|
||||
// gracefully.
|
||||
func (r *Reactor) processBlockchainCh() {
|
||||
defer r.blockchainCh.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case envelope := <-r.blockchainCh.In():
|
||||
if err := r.handleMessage(r.blockchainCh.ID(), envelope); err != nil {
|
||||
r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID(), "envelope", envelope, "err", err)
|
||||
r.blockchainCh.Error() <- p2p.PeerError{
|
||||
PeerID: envelope.From,
|
||||
Err: err,
|
||||
Severity: p2p.PeerErrorSeverityLow,
|
||||
}
|
||||
}
|
||||
|
||||
case <-r.closeCh:
|
||||
r.Logger.Debug("stopped listening on blockchain channel; closing...")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processPeerUpdate processes a PeerUpdate, returning an error upon failing to
|
||||
// handle the PeerUpdate or if a panic is recovered.
|
||||
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
|
||||
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
|
||||
|
||||
// XXX: Pool#RedoRequest can sometimes give us an empty peer
|
||||
if len(peerUpdate.PeerID) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
switch peerUpdate.Status {
|
||||
case p2p.PeerStatusNew, p2p.PeerStatusUp:
|
||||
// send a status update the newly added peer
|
||||
r.blockchainCh.Out() <- p2p.Envelope{
|
||||
To: peerUpdate.PeerID,
|
||||
Message: &bcproto.StatusResponse{
|
||||
Base: r.store.Base(),
|
||||
Height: r.store.Height(),
|
||||
},
|
||||
}
|
||||
|
||||
case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
|
||||
r.pool.RemovePeer(peerUpdate.PeerID)
|
||||
}
|
||||
}
|
||||
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates() {
|
||||
defer r.peerUpdates.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case peerUpdate := <-r.peerUpdates.Updates():
|
||||
r.processPeerUpdate(peerUpdate)
|
||||
|
||||
case <-r.closeCh:
|
||||
r.Logger.Debug("stopped listening on peer updates channel; closing...")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SwitchToFastSync is called by the state sync reactor when switching to fast
|
||||
// sync.
|
||||
func (r *Reactor) SwitchToFastSync(state sm.State) error {
|
||||
r.fastSync = true
|
||||
r.initialState = state
|
||||
r.pool.height = state.LastBlockHeight + 1
|
||||
|
||||
if err := r.pool.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
go bcR.poolRoutine(true)
|
||||
|
||||
go r.poolRoutine(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements service.Service.
|
||||
func (bcR *BlockchainReactor) OnStop() {
|
||||
if bcR.fastSync {
|
||||
if err := bcR.pool.Stop(); err != nil {
|
||||
bcR.Logger.Error("Error stopping pool", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
func (r *Reactor) requestRoutine() {
|
||||
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
|
||||
defer statusUpdateTicker.Stop()
|
||||
|
||||
// GetChannels implements Reactor
|
||||
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
return []*p2p.ChannelDescriptor{
|
||||
{
|
||||
ID: BlockchainChannel,
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 1000,
|
||||
RecvBufferCapacity: 50 * 4096,
|
||||
RecvMessageCapacity: bc.MaxMsgSize,
|
||||
},
|
||||
}
|
||||
}
|
||||
r.poolWG.Add(1)
|
||||
defer r.poolWG.Done()
|
||||
|
||||
// AddPeer implements Reactor by sending our state to peer.
|
||||
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
|
||||
Base: bcR.store.Base(),
|
||||
Height: bcR.store.Height()})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
_ = peer.Send(BlockchainChannel, msgBytes)
|
||||
// it's OK if send fails. will try later in poolRoutine
|
||||
|
||||
// peer is added to the pool once we receive the first
|
||||
// bcStatusResponseMessage from the peer and call pool.SetPeerRange
|
||||
}
|
||||
|
||||
// RemovePeer implements Reactor by removing peer from the pool.
|
||||
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
bcR.pool.RemovePeer(peer.ID())
|
||||
}
|
||||
|
||||
// respondToPeer loads a block and sends it to the requesting peer,
|
||||
// if we have it. Otherwise, we'll respond saying we don't have it.
|
||||
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
|
||||
src p2p.Peer) (queued bool) {
|
||||
|
||||
block := bcR.store.LoadBlock(msg.Height)
|
||||
if block != nil {
|
||||
bl, err := block.ToProto()
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bl})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not marshal msg", "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return src.TrySend(BlockchainChannel, msgBytes)
|
||||
}
|
||||
|
||||
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
|
||||
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return src.TrySend(BlockchainChannel, msgBytes)
|
||||
}
|
||||
|
||||
// Receive implements Reactor by handling 4 types of messages (look below).
|
||||
// XXX: do not call any methods that can block or incur heavy processing.
|
||||
// https://github.com/tendermint/tendermint/issues/2888
|
||||
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
logger := bcR.Logger.With("src", src, "chId", chID)
|
||||
|
||||
msg, err := bc.DecodeMsg(msgBytes)
|
||||
if err != nil {
|
||||
logger.Error("Error decoding message", "err", err)
|
||||
bcR.Switch.StopPeerForError(src, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = bc.ValidateMsg(msg); err != nil {
|
||||
logger.Error("Peer sent us invalid msg", "msg", msg, "err", err)
|
||||
bcR.Switch.StopPeerForError(src, err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debug("Receive", "msg", msg)
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *bcproto.BlockRequest:
|
||||
bcR.respondToPeer(msg, src)
|
||||
case *bcproto.BlockResponse:
|
||||
bi, err := types.BlockFromProto(msg.Block)
|
||||
if err != nil {
|
||||
logger.Error("Block content is invalid", "err", err)
|
||||
bcR.Switch.StopPeerForError(src, err)
|
||||
for {
|
||||
select {
|
||||
case <-r.Quit():
|
||||
return
|
||||
}
|
||||
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
|
||||
case *bcproto.StatusRequest:
|
||||
// Send peer our state.
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
|
||||
Height: bcR.store.Height(),
|
||||
Base: bcR.store.Base(),
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("could not convert msg to protobut", "err", err)
|
||||
|
||||
case <-r.pool.Quit():
|
||||
return
|
||||
|
||||
case request := <-r.requestsCh:
|
||||
r.blockchainCh.Out() <- p2p.Envelope{
|
||||
To: request.PeerID,
|
||||
Message: &bcproto.BlockRequest{Height: request.Height},
|
||||
}
|
||||
|
||||
case pErr := <-r.errorsCh:
|
||||
r.blockchainCh.Error() <- p2p.PeerError{
|
||||
PeerID: pErr.peerID,
|
||||
Err: pErr.err,
|
||||
Severity: p2p.PeerErrorSeverityLow,
|
||||
}
|
||||
|
||||
case <-statusUpdateTicker.C:
|
||||
r.poolWG.Add(1)
|
||||
|
||||
go func() {
|
||||
defer r.poolWG.Done()
|
||||
|
||||
r.blockchainCh.Out() <- p2p.Envelope{
|
||||
Broadcast: true,
|
||||
Message: &bcproto.StatusRequest{},
|
||||
}
|
||||
}()
|
||||
}
|
||||
src.TrySend(BlockchainChannel, msgBytes)
|
||||
case *bcproto.StatusResponse:
|
||||
// Got a peer status. Unverified.
|
||||
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
|
||||
case *bcproto.NoBlockResponse:
|
||||
logger.Debug("Peer does not have requested block", "height", msg.Height)
|
||||
default:
|
||||
logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
|
||||
}
|
||||
}
|
||||
|
||||
// Handle messages from the poolReactor telling the reactor what to do.
|
||||
// poolRoutine handles messages from the poolReactor telling the reactor what to
|
||||
// do.
|
||||
//
|
||||
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
|
||||
func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) {
|
||||
func (r *Reactor) poolRoutine(stateSynced bool) {
|
||||
var (
|
||||
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
|
||||
statusUpdateTicker = time.NewTicker(statusUpdateIntervalSeconds * time.Second)
|
||||
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
|
||||
|
||||
blocksSynced = uint64(0)
|
||||
|
||||
chainID = bcR.initialState.ChainID
|
||||
state = bcR.initialState
|
||||
chainID = r.initialState.ChainID
|
||||
state = r.initialState
|
||||
|
||||
lastHundred = time.Now()
|
||||
lastRate = 0.0
|
||||
|
||||
didProcessCh = make(chan struct{}, 1)
|
||||
)
|
||||
|
||||
defer trySyncTicker.Stop()
|
||||
defer statusUpdateTicker.Stop()
|
||||
defer switchToConsensusTicker.Stop()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
go r.requestRoutine()
|
||||
|
||||
case <-bcR.Quit():
|
||||
return
|
||||
|
||||
case <-bcR.pool.Quit():
|
||||
return
|
||||
|
||||
case request := <-bcR.requestsCh:
|
||||
peer := bcR.Switch.Peers().Get(request.PeerID)
|
||||
if peer == nil {
|
||||
bcR.Logger.Debug("Can't send request: no peer", "peer_id", request.PeerID)
|
||||
continue
|
||||
}
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert BlockRequest to proto", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
queued := peer.TrySend(BlockchainChannel, msgBytes)
|
||||
if !queued {
|
||||
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
|
||||
}
|
||||
|
||||
case err := <-bcR.errorsCh:
|
||||
peer := bcR.Switch.Peers().Get(err.peerID)
|
||||
if peer != nil {
|
||||
bcR.Switch.StopPeerForError(peer, err)
|
||||
}
|
||||
|
||||
case <-statusUpdateTicker.C:
|
||||
// ask for status updates
|
||||
go bcR.BroadcastStatusRequest()
|
||||
}
|
||||
}
|
||||
}()
|
||||
r.poolWG.Add(1)
|
||||
defer r.poolWG.Done()
|
||||
|
||||
FOR_LOOP:
|
||||
for {
|
||||
select {
|
||||
|
||||
case <-switchToConsensusTicker.C:
|
||||
var (
|
||||
height, numPending, lenRequesters = bcR.pool.GetStatus()
|
||||
outbound, inbound, _ = bcR.Switch.NumPeers()
|
||||
lastAdvance = bcR.pool.LastAdvance()
|
||||
height, numPending, lenRequesters = r.pool.GetStatus()
|
||||
lastAdvance = r.pool.LastAdvance()
|
||||
)
|
||||
|
||||
bcR.Logger.Debug("Consensus ticker",
|
||||
"numPending", numPending,
|
||||
"total", lenRequesters)
|
||||
r.Logger.Debug(
|
||||
"consensus ticker",
|
||||
"num_pending", numPending,
|
||||
"total", lenRequesters,
|
||||
"height", height,
|
||||
)
|
||||
|
||||
switch {
|
||||
case bcR.pool.IsCaughtUp():
|
||||
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
|
||||
case r.pool.IsCaughtUp():
|
||||
r.Logger.Info("switching to consensus reactor", "height", height)
|
||||
|
||||
case time.Since(lastAdvance) > syncTimeout:
|
||||
bcR.Logger.Error(fmt.Sprintf("No progress since last advance: %v", lastAdvance))
|
||||
r.Logger.Error("no progress since last advance", "last_advance", lastAdvance)
|
||||
|
||||
default:
|
||||
bcR.Logger.Info("Not caught up yet",
|
||||
"height", height, "max_peer_height", bcR.pool.MaxPeerHeight(),
|
||||
"num_peers", outbound+inbound,
|
||||
"timeout_in", syncTimeout-time.Since(lastAdvance))
|
||||
r.Logger.Info(
|
||||
"not caught up yet",
|
||||
"height", height,
|
||||
"max_peer_height", r.pool.MaxPeerHeight(),
|
||||
"timeout_in", syncTimeout-time.Since(lastAdvance),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := bcR.pool.Stop(); err != nil {
|
||||
bcR.Logger.Error("Error stopping pool", "err", err)
|
||||
if err := r.pool.Stop(); err != nil {
|
||||
r.Logger.Error("failed to stop pool", "err", err)
|
||||
}
|
||||
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
|
||||
if ok {
|
||||
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
|
||||
|
||||
if r.consReactor != nil {
|
||||
r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
|
||||
}
|
||||
|
||||
break FOR_LOOP
|
||||
|
||||
case <-trySyncTicker.C: // chan time
|
||||
case <-trySyncTicker.C:
|
||||
select {
|
||||
case didProcessCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
case <-didProcessCh:
|
||||
// NOTE: It is a subtle mistake to process more than a single block
|
||||
// at a time (e.g. 10) here, because we only TrySend 1 request per
|
||||
// loop. The ratio mismatch can result in starving of blocks, a
|
||||
// sudden burst of requests and responses, and repeat.
|
||||
// Consequently, it is better to split these routines rather than
|
||||
// coupling them as it's written here. TODO uncouple from request
|
||||
// routine.
|
||||
// NOTE: It is a subtle mistake to process more than a single block at a
|
||||
// time (e.g. 10) here, because we only send one BlockRequest per loop
|
||||
// iteration. The ratio mismatch can result in starving of blocks, i.e. a
|
||||
// sudden burst of requests and responses, and repeat. Consequently, it is
|
||||
// better to split these routines rather than coupling them as it is
|
||||
// written here.
|
||||
//
|
||||
// TODO: Uncouple from request routine.
|
||||
|
||||
// See if there are any blocks to sync.
|
||||
first, second := bcR.pool.PeekTwoBlocks()
|
||||
// bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
|
||||
// see if there are any blocks to sync
|
||||
first, second := r.pool.PeekTwoBlocks()
|
||||
if first == nil || second == nil {
|
||||
// We need both to sync the first block.
|
||||
// we need both to sync the first block
|
||||
continue FOR_LOOP
|
||||
} else {
|
||||
// Try again quickly next loop.
|
||||
// try again quickly next loop
|
||||
didProcessCh <- struct{}{}
|
||||
}
|
||||
|
||||
@@ -391,71 +522,75 @@ FOR_LOOP:
|
||||
firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
|
||||
)
|
||||
|
||||
// Finally, verify the first block using the second's commit
|
||||
// NOTE: we can probably make this more efficient, but note that calling
|
||||
// Finally, verify the first block using the second's commit.
|
||||
//
|
||||
// NOTE: We can probably make this more efficient, but note that calling
|
||||
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
|
||||
// currently necessary.
|
||||
err := state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("invalid last commit: %w", err)
|
||||
bcR.Logger.Error(err.Error(),
|
||||
"last_commit", second.LastCommit, "block_id", firstID, "height", first.Height)
|
||||
r.Logger.Error(
|
||||
err.Error(),
|
||||
"last_commit", second.LastCommit,
|
||||
"block_id", firstID,
|
||||
"height", first.Height,
|
||||
)
|
||||
|
||||
peerID := bcR.pool.RedoRequest(first.Height)
|
||||
peer := bcR.Switch.Peers().Get(peerID)
|
||||
if peer != nil {
|
||||
// NOTE: we've already removed the peer's request, but we still need
|
||||
// to clean up the rest.
|
||||
bcR.Switch.StopPeerForError(peer, err)
|
||||
// NOTE: We've already removed the peer's request, but we still need
|
||||
// to clean up the rest.
|
||||
peerID := r.pool.RedoRequest(first.Height)
|
||||
r.blockchainCh.Error() <- p2p.PeerError{
|
||||
PeerID: peerID,
|
||||
Err: err,
|
||||
Severity: p2p.PeerErrorSeverityLow,
|
||||
}
|
||||
|
||||
peerID2 := bcR.pool.RedoRequest(second.Height)
|
||||
peerID2 := r.pool.RedoRequest(second.Height)
|
||||
if peerID2 != peerID {
|
||||
if peer2 := bcR.Switch.Peers().Get(peerID2); peer2 != nil {
|
||||
bcR.Switch.StopPeerForError(peer2, err)
|
||||
r.blockchainCh.Error() <- p2p.PeerError{
|
||||
PeerID: peerID2,
|
||||
Err: err,
|
||||
Severity: p2p.PeerErrorSeverityLow,
|
||||
}
|
||||
}
|
||||
|
||||
continue FOR_LOOP
|
||||
} else {
|
||||
bcR.pool.PopRequest()
|
||||
r.pool.PopRequest()
|
||||
|
||||
// TODO: batch saves so we dont persist to disk every block
|
||||
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
|
||||
// TODO: batch saves so we do not persist to disk every block
|
||||
r.store.SaveBlock(first, firstParts, second.LastCommit)
|
||||
|
||||
// TODO: same thing for app - but we would need a way to get the hash
|
||||
// without persisting the state.
|
||||
var err error
|
||||
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
|
||||
|
||||
// TODO: Same thing for app - but we would need a way to get the hash
|
||||
// without persisting the state.
|
||||
state, _, err = r.blockExec.ApplyBlock(state, firstID, first)
|
||||
if err != nil {
|
||||
// TODO This is bad, are we zombie?
|
||||
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
|
||||
// TODO: This is bad, are we zombie?
|
||||
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
|
||||
}
|
||||
|
||||
blocksSynced++
|
||||
|
||||
if blocksSynced%100 == 0 {
|
||||
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
|
||||
bcR.Logger.Info("Fast Sync Rate",
|
||||
"height", bcR.pool.height, "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
|
||||
r.Logger.Info(
|
||||
"fast sync rate",
|
||||
"height", r.pool.height,
|
||||
"max_peer_height", r.pool.MaxPeerHeight(),
|
||||
"blocks/s", lastRate,
|
||||
)
|
||||
|
||||
lastHundred = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
continue FOR_LOOP
|
||||
|
||||
case <-bcR.Quit():
|
||||
case <-r.Quit():
|
||||
break FOR_LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
|
||||
func (bcR *BlockchainReactor) BroadcastStatusRequest() {
|
||||
bm, err := bc.EncodeMsg(&bcproto.StatusRequest{})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert StatusRequest to proto", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// We don't care about whenever broadcast is successful or not.
|
||||
_ = bcR.Switch.Broadcast(BlockchainChannel, bm)
|
||||
}
|
||||
|
||||
@@ -2,71 +2,59 @@ package v0
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/mempool/mock"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
tmtime "github.com/tendermint/tendermint/types/time"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
)
|
||||
|
||||
var config *cfg.Config
|
||||
var rng = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
|
||||
validators := make([]types.GenesisValidator, numValidators)
|
||||
privValidators := make([]types.PrivValidator, numValidators)
|
||||
for i := 0; i < numValidators; i++ {
|
||||
val, privVal := types.RandValidator(randPower, minPower)
|
||||
validators[i] = types.GenesisValidator{
|
||||
PubKey: val.PubKey,
|
||||
Power: val.VotingPower,
|
||||
}
|
||||
privValidators[i] = privVal
|
||||
}
|
||||
sort.Sort(types.PrivValidatorsByAddress(privValidators))
|
||||
|
||||
return &types.GenesisDoc{
|
||||
GenesisTime: tmtime.Now(),
|
||||
ChainID: config.ChainID(),
|
||||
Validators: validators,
|
||||
}, privValidators
|
||||
}
|
||||
|
||||
type BlockchainReactorPair struct {
|
||||
reactor *BlockchainReactor
|
||||
type reactorTestSuite struct {
|
||||
reactor *Reactor
|
||||
app proxy.AppConns
|
||||
|
||||
peerID p2p.NodeID
|
||||
|
||||
blockchainChannel *p2p.Channel
|
||||
blockchainInCh chan p2p.Envelope
|
||||
blockchainOutCh chan p2p.Envelope
|
||||
blockchainPeerErrCh chan p2p.PeerError
|
||||
|
||||
peerUpdatesCh chan p2p.PeerUpdate
|
||||
peerUpdates *p2p.PeerUpdatesCh
|
||||
}
|
||||
|
||||
func newBlockchainReactor(
|
||||
logger log.Logger,
|
||||
func setup(
|
||||
t *testing.T,
|
||||
genDoc *types.GenesisDoc,
|
||||
privVals []types.PrivValidator,
|
||||
maxBlockHeight int64) BlockchainReactorPair {
|
||||
if len(privVals) != 1 {
|
||||
panic("only support one validator")
|
||||
}
|
||||
maxBlockHeight int64,
|
||||
chBuf uint,
|
||||
) *reactorTestSuite {
|
||||
t.Helper()
|
||||
|
||||
app := &testApp{}
|
||||
require.Len(t, privVals, 1, "only one validator can be supported")
|
||||
|
||||
app := &abci.BaseApplication{}
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
|
||||
proxyApp := proxy.NewAppConns(cc)
|
||||
err := proxyApp.Start()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("error start app: %w", err))
|
||||
}
|
||||
require.NoError(t, proxyApp.Start())
|
||||
|
||||
blockDB := dbm.NewMemDB()
|
||||
stateDB := dbm.NewMemDB()
|
||||
@@ -74,25 +62,24 @@ func newBlockchainReactor(
|
||||
blockStore := store.NewBlockStore(blockDB)
|
||||
|
||||
state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("error constructing state from genesis file: %w", err))
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
// Make the BlockchainReactor itself.
|
||||
// NOTE we have to create and commit the blocks first because
|
||||
// pool.height is determined from the store.
|
||||
fastSync := true
|
||||
db := dbm.NewMemDB()
|
||||
stateStore = sm.NewStore(db)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
|
||||
mock.Mempool{}, sm.EmptyEvidencePool{})
|
||||
if err = stateStore.Save(state); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// let's add some blocks in
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
proxyApp.Consensus(),
|
||||
mock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
)
|
||||
require.NoError(t, stateStore.Save(state))
|
||||
|
||||
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
|
||||
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
|
||||
|
||||
if blockHeight > 1 {
|
||||
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
|
||||
lastBlock := blockStore.LoadBlock(blockHeight - 1)
|
||||
@@ -105,60 +92,194 @@ func newBlockchainReactor(
|
||||
lastBlock.Header.ChainID,
|
||||
time.Now(),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
lastCommit = types.NewCommit(vote.Height, vote.Round,
|
||||
lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()})
|
||||
require.NoError(t, err)
|
||||
|
||||
lastCommit = types.NewCommit(
|
||||
vote.Height,
|
||||
vote.Round,
|
||||
lastBlockMeta.BlockID,
|
||||
[]types.CommitSig{vote.CommitSig()},
|
||||
)
|
||||
}
|
||||
|
||||
thisBlock := makeBlock(blockHeight, state, lastCommit)
|
||||
|
||||
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
|
||||
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
|
||||
|
||||
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("error apply block: %w", err))
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
|
||||
}
|
||||
|
||||
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
||||
pID := make([]byte, 16)
|
||||
_, err = rng.Read(pID)
|
||||
require.NoError(t, err)
|
||||
|
||||
return BlockchainReactorPair{bcReactor, proxyApp}
|
||||
peerUpdatesCh := make(chan p2p.PeerUpdate, chBuf)
|
||||
|
||||
rts := &reactorTestSuite{
|
||||
app: proxyApp,
|
||||
blockchainInCh: make(chan p2p.Envelope, chBuf),
|
||||
blockchainOutCh: make(chan p2p.Envelope, chBuf),
|
||||
blockchainPeerErrCh: make(chan p2p.PeerError, chBuf),
|
||||
peerUpdatesCh: peerUpdatesCh,
|
||||
peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh),
|
||||
peerID: p2p.NodeID(fmt.Sprintf("%x", pID)),
|
||||
}
|
||||
|
||||
rts.blockchainChannel = p2p.NewChannel(
|
||||
BlockchainChannel,
|
||||
new(bcproto.Message),
|
||||
rts.blockchainInCh,
|
||||
rts.blockchainOutCh,
|
||||
rts.blockchainPeerErrCh,
|
||||
)
|
||||
|
||||
rts.reactor = NewReactor(
|
||||
log.TestingLogger().With("module", "blockchain", "node", rts.peerID),
|
||||
state.Copy(),
|
||||
blockExec,
|
||||
blockStore,
|
||||
nil,
|
||||
rts.blockchainChannel,
|
||||
rts.peerUpdates,
|
||||
fastSync,
|
||||
)
|
||||
|
||||
require.NoError(t, rts.reactor.Start())
|
||||
require.True(t, rts.reactor.IsRunning())
|
||||
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, rts.reactor.Stop())
|
||||
require.NoError(t, rts.app.Stop())
|
||||
require.False(t, rts.reactor.IsRunning())
|
||||
})
|
||||
|
||||
return rts
|
||||
}
|
||||
|
||||
func TestNoBlockResponse(t *testing.T) {
|
||||
config = cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
genDoc, privVals := randGenesisDoc(1, false, 30)
|
||||
func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite, dropChErr bool) {
|
||||
// create a mapping for efficient suite lookup by peer ID
|
||||
suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite)
|
||||
for _, suite := range suites {
|
||||
suitesByPeerID[suite.peerID] = suite
|
||||
}
|
||||
|
||||
maxBlockHeight := int64(65)
|
||||
|
||||
reactorPairs := make([]BlockchainReactorPair, 2)
|
||||
|
||||
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
|
||||
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
|
||||
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
|
||||
defer func() {
|
||||
for _, r := range reactorPairs {
|
||||
err := r.reactor.Stop()
|
||||
require.NoError(t, err)
|
||||
err = r.app.Stop()
|
||||
require.NoError(t, err)
|
||||
// Simulate a router by listening for all outbound envelopes and proxying the
|
||||
// envelope to the respective peer (suite).
|
||||
go func() {
|
||||
for envelope := range primary.blockchainOutCh {
|
||||
if envelope.Broadcast {
|
||||
for _, s := range suites {
|
||||
// broadcast to everyone except source
|
||||
if s.peerID != primary.peerID {
|
||||
s.blockchainInCh <- p2p.Envelope{
|
||||
From: primary.peerID,
|
||||
To: s.peerID,
|
||||
Message: envelope.Message,
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
suitesByPeerID[envelope.To].blockchainInCh <- p2p.Envelope{
|
||||
From: primary.peerID,
|
||||
To: envelope.To,
|
||||
Message: envelope.Message,
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
tests := []struct {
|
||||
go func() {
|
||||
for pErr := range primary.blockchainPeerErrCh {
|
||||
if dropChErr {
|
||||
primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err)
|
||||
} else {
|
||||
primary.peerUpdatesCh <- p2p.PeerUpdate{
|
||||
PeerID: pErr.PeerID,
|
||||
Status: p2p.PeerStatusRemoved,
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func TestReactor_AbruptDisconnect(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
|
||||
genDoc, privVals := randGenesisDoc(config, 1, false, 30)
|
||||
maxBlockHeight := int64(64)
|
||||
testSuites := []*reactorTestSuite{
|
||||
setup(t, genDoc, privVals, maxBlockHeight, 0),
|
||||
setup(t, genDoc, privVals, 0, 0),
|
||||
}
|
||||
|
||||
require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height())
|
||||
|
||||
for _, s := range testSuites {
|
||||
simulateRouter(s, testSuites, true)
|
||||
|
||||
// connect reactor to every other reactor
|
||||
for _, ss := range testSuites {
|
||||
if s.peerID != ss.peerID {
|
||||
s.peerUpdatesCh <- p2p.PeerUpdate{
|
||||
Status: p2p.PeerStatusUp,
|
||||
PeerID: ss.peerID,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
secondaryPool := testSuites[1].reactor.pool
|
||||
require.Eventually(
|
||||
t,
|
||||
func() bool {
|
||||
height, _, _ := secondaryPool.GetStatus()
|
||||
return secondaryPool.MaxPeerHeight() > 0 && height > 0 && height < 10
|
||||
},
|
||||
10*time.Second,
|
||||
10*time.Millisecond,
|
||||
"expected node to be partially synced",
|
||||
)
|
||||
|
||||
// Remove synced node from the syncing node which should not result in any
|
||||
// deadlocks or race conditions within the context of poolRoutine.
|
||||
testSuites[1].peerUpdatesCh <- p2p.PeerUpdate{
|
||||
Status: p2p.PeerStatusDown,
|
||||
PeerID: testSuites[0].peerID,
|
||||
}
|
||||
}
|
||||
|
||||
func TestReactor_NoBlockResponse(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
|
||||
genDoc, privVals := randGenesisDoc(config, 1, false, 30)
|
||||
maxBlockHeight := int64(65)
|
||||
testSuites := []*reactorTestSuite{
|
||||
setup(t, genDoc, privVals, maxBlockHeight, 0),
|
||||
setup(t, genDoc, privVals, 0, 0),
|
||||
}
|
||||
|
||||
require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height())
|
||||
|
||||
for _, s := range testSuites {
|
||||
simulateRouter(s, testSuites, true)
|
||||
|
||||
// connect reactor to every other reactor
|
||||
for _, ss := range testSuites {
|
||||
if s.peerID != ss.peerID {
|
||||
s.peerUpdatesCh <- p2p.PeerUpdate{
|
||||
Status: p2p.PeerStatusUp,
|
||||
PeerID: ss.peerID,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
height int64
|
||||
existent bool
|
||||
}{
|
||||
@@ -168,161 +289,114 @@ func TestNoBlockResponse(t *testing.T) {
|
||||
{100, false},
|
||||
}
|
||||
|
||||
for {
|
||||
if reactorPairs[1].reactor.pool.IsCaughtUp() {
|
||||
break
|
||||
}
|
||||
secondaryPool := testSuites[1].reactor.pool
|
||||
require.Eventually(
|
||||
t,
|
||||
func() bool { return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() },
|
||||
10*time.Second,
|
||||
10*time.Millisecond,
|
||||
"expected node to be fully synced",
|
||||
)
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
|
||||
|
||||
for _, tt := range tests {
|
||||
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
|
||||
if tt.existent {
|
||||
assert.True(t, block != nil)
|
||||
for _, tc := range testCases {
|
||||
block := testSuites[1].reactor.store.LoadBlock(tc.height)
|
||||
if tc.existent {
|
||||
require.True(t, block != nil)
|
||||
} else {
|
||||
assert.True(t, block == nil)
|
||||
require.Nil(t, block)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: This is too hard to test without
|
||||
// an easy way to add test peer to switch
|
||||
// or without significant refactoring of the module.
|
||||
// Alternatively we could actually dial a TCP conn but
|
||||
// that seems extreme.
|
||||
func TestBadBlockStopsPeer(t *testing.T) {
|
||||
config = cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
func TestReactor_BadBlockStopsPeer(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
genDoc, privVals := randGenesisDoc(1, false, 30)
|
||||
|
||||
maxBlockHeight := int64(148)
|
||||
maxBlockHeight := int64(48)
|
||||
genDoc, privVals := randGenesisDoc(config, 1, false, 30)
|
||||
|
||||
// Other chain needs a different validator set
|
||||
otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30)
|
||||
otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight)
|
||||
testSuites := []*reactorTestSuite{
|
||||
setup(t, genDoc, privVals, maxBlockHeight, 1000), // fully synced node
|
||||
setup(t, genDoc, privVals, 0, 1000),
|
||||
setup(t, genDoc, privVals, 0, 1000),
|
||||
setup(t, genDoc, privVals, 0, 1000),
|
||||
setup(t, genDoc, privVals, 0, 1000), // new node
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := otherChain.reactor.Stop()
|
||||
require.Error(t, err)
|
||||
err = otherChain.app.Stop()
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height())
|
||||
|
||||
reactorPairs := make([]BlockchainReactorPair, 4)
|
||||
for _, s := range testSuites[:len(testSuites)-1] {
|
||||
simulateRouter(s, testSuites, true)
|
||||
|
||||
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
|
||||
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
|
||||
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
|
||||
defer func() {
|
||||
for _, r := range reactorPairs {
|
||||
err := r.reactor.Stop()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = r.app.Stop()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
time.Sleep(1 * time.Second)
|
||||
caughtUp := true
|
||||
for _, r := range reactorPairs {
|
||||
if !r.reactor.pool.IsCaughtUp() {
|
||||
caughtUp = false
|
||||
// connect reactor to every other reactor except the new node
|
||||
for _, ss := range testSuites[:len(testSuites)-1] {
|
||||
if s.peerID != ss.peerID {
|
||||
s.peerUpdatesCh <- p2p.PeerUpdate{
|
||||
Status: p2p.PeerStatusUp,
|
||||
PeerID: ss.peerID,
|
||||
}
|
||||
}
|
||||
}
|
||||
if caughtUp {
|
||||
break
|
||||
}
|
||||
|
||||
require.Eventually(
|
||||
t,
|
||||
func() bool {
|
||||
caughtUp := true
|
||||
for _, s := range testSuites[1 : len(testSuites)-1] {
|
||||
if s.reactor.pool.MaxPeerHeight() == 0 || !s.reactor.pool.IsCaughtUp() {
|
||||
caughtUp = false
|
||||
}
|
||||
}
|
||||
|
||||
return caughtUp
|
||||
},
|
||||
10*time.Minute,
|
||||
10*time.Millisecond,
|
||||
"expected all nodes to be fully synced",
|
||||
)
|
||||
|
||||
for _, s := range testSuites[:len(testSuites)-1] {
|
||||
require.Len(t, s.reactor.pool.peers, 3)
|
||||
}
|
||||
|
||||
// Mark testSuites[3] as an invalid peer which will cause newSuite to disconnect
|
||||
// from this peer.
|
||||
otherGenDoc, otherPrivVals := randGenesisDoc(config, 1, false, 30)
|
||||
otherSuite := setup(t, otherGenDoc, otherPrivVals, maxBlockHeight, 0)
|
||||
testSuites[3].reactor.store = otherSuite.reactor.store
|
||||
|
||||
// add a fake peer just so we do not wait for the consensus ticker to timeout
|
||||
otherSuite.reactor.pool.SetPeerRange("00ff", 10, 10)
|
||||
|
||||
// start the new peer's faux router
|
||||
newSuite := testSuites[len(testSuites)-1]
|
||||
simulateRouter(newSuite, testSuites, false)
|
||||
|
||||
// connect all nodes to the new peer
|
||||
for _, s := range testSuites[:len(testSuites)-1] {
|
||||
newSuite.peerUpdatesCh <- p2p.PeerUpdate{
|
||||
Status: p2p.PeerStatusUp,
|
||||
PeerID: s.peerID,
|
||||
}
|
||||
}
|
||||
|
||||
// at this time, reactors[0-3] is the newest
|
||||
assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
|
||||
// wait for the new peer to catch up and become fully synced
|
||||
require.Eventually(
|
||||
t,
|
||||
func() bool { return newSuite.reactor.pool.MaxPeerHeight() > 0 && newSuite.reactor.pool.IsCaughtUp() },
|
||||
10*time.Minute,
|
||||
10*time.Millisecond,
|
||||
"expected new node to be fully synced",
|
||||
)
|
||||
|
||||
// Mark reactorPairs[3] as an invalid peer. Fiddling with .store without a mutex is a data
|
||||
// race, but can't be easily avoided.
|
||||
reactorPairs[3].reactor.store = otherChain.reactor.store
|
||||
|
||||
lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
reactorPairs = append(reactorPairs, lastReactorPair)
|
||||
|
||||
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)...)
|
||||
|
||||
for i := 0; i < len(reactorPairs)-1; i++ {
|
||||
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
|
||||
}
|
||||
|
||||
for {
|
||||
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
|
||||
}
|
||||
|
||||
//----------------------------------------------
|
||||
// utility funcs
|
||||
|
||||
func makeTxs(height int64) (txs []types.Tx) {
|
||||
for i := 0; i < 10; i++ {
|
||||
txs = append(txs, types.Tx([]byte{byte(height), byte(i)}))
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
|
||||
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
|
||||
return block
|
||||
}
|
||||
|
||||
type testApp struct {
|
||||
abci.BaseApplication
|
||||
}
|
||||
|
||||
var _ abci.Application = (*testApp)(nil)
|
||||
|
||||
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
|
||||
return abci.ResponseInfo{}
|
||||
}
|
||||
|
||||
func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
|
||||
return abci.ResponseBeginBlock{}
|
||||
}
|
||||
|
||||
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
|
||||
return abci.ResponseEndBlock{}
|
||||
}
|
||||
|
||||
func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
|
||||
return abci.ResponseDeliverTx{Events: []abci.Event{}}
|
||||
}
|
||||
|
||||
func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
|
||||
return abci.ResponseCheckTx{}
|
||||
}
|
||||
|
||||
func (app *testApp) Commit() abci.ResponseCommit {
|
||||
return abci.ResponseCommit{}
|
||||
}
|
||||
|
||||
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
|
||||
return
|
||||
require.Eventuallyf(
|
||||
t,
|
||||
func() bool { return len(newSuite.reactor.pool.peers) < len(testSuites)-1 },
|
||||
10*time.Minute,
|
||||
10*time.Millisecond,
|
||||
"invalid number of peers; expected < %d, got: %d",
|
||||
len(testSuites)-1,
|
||||
len(newSuite.reactor.pool.peers),
|
||||
)
|
||||
}
|
||||
|
||||
50
blockchain/v0/test_util.go
Normal file
50
blockchain/v0/test_util.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package v0
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
tmtime "github.com/tendermint/tendermint/types/time"
|
||||
)
|
||||
|
||||
func randGenesisDoc(
|
||||
config *cfg.Config,
|
||||
numValidators int,
|
||||
randPower bool,
|
||||
minPower int64,
|
||||
) (*types.GenesisDoc, []types.PrivValidator) {
|
||||
validators := make([]types.GenesisValidator, numValidators)
|
||||
privValidators := make([]types.PrivValidator, numValidators)
|
||||
|
||||
for i := 0; i < numValidators; i++ {
|
||||
val, privVal := types.RandValidator(randPower, minPower)
|
||||
validators[i] = types.GenesisValidator{
|
||||
PubKey: val.PubKey,
|
||||
Power: val.VotingPower,
|
||||
}
|
||||
|
||||
privValidators[i] = privVal
|
||||
}
|
||||
|
||||
sort.Sort(types.PrivValidatorsByAddress(privValidators))
|
||||
|
||||
return &types.GenesisDoc{
|
||||
GenesisTime: tmtime.Now(),
|
||||
ChainID: config.ChainID(),
|
||||
Validators: validators,
|
||||
}, privValidators
|
||||
}
|
||||
|
||||
func makeTxs(height int64) (txs []types.Tx) {
|
||||
for i := 0; i < 10; i++ {
|
||||
txs = append(txs, types.Tx([]byte{byte(height), byte(i)}))
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
|
||||
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
|
||||
return block
|
||||
}
|
||||
@@ -3,7 +3,7 @@ package v2
|
||||
import (
|
||||
"errors"
|
||||
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
|
||||
"github.com/tendermint/tendermint/state"
|
||||
@@ -48,7 +48,13 @@ type consensusReactor interface {
|
||||
}
|
||||
|
||||
func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error {
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height})
|
||||
msgProto := new(bcproto.Message)
|
||||
|
||||
if err := msgProto.Wrap(&bcproto.BlockRequest{Height: height}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgBytes, err := proto.Marshal(msgProto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -61,7 +67,13 @@ func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error {
|
||||
}
|
||||
|
||||
func (sio *switchIO) sendStatusResponse(base int64, height int64, peer p2p.Peer) error {
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base})
|
||||
msgProto := new(bcproto.Message)
|
||||
|
||||
if err := msgProto.Wrap(&bcproto.StatusResponse{Height: height, Base: base}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgBytes, err := proto.Marshal(msgProto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -83,10 +95,17 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bpb})
|
||||
msgProto := new(bcproto.Message)
|
||||
|
||||
if err := msgProto.Wrap(&bcproto.BlockResponse{Block: bpb}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgBytes, err := proto.Marshal(msgProto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
|
||||
return errPeerQueueFull
|
||||
}
|
||||
@@ -95,7 +114,13 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error {
|
||||
}
|
||||
|
||||
func (sio *switchIO) sendBlockNotFound(height int64, peer p2p.Peer) error {
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: height})
|
||||
msgProto := new(bcproto.Message)
|
||||
|
||||
if err := msgProto.Wrap(&bcproto.NoBlockResponse{Height: height}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgBytes, err := proto.Marshal(msgProto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -116,7 +141,13 @@ func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool
|
||||
}
|
||||
|
||||
func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error {
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
|
||||
msgProto := new(bcproto.Message)
|
||||
|
||||
if err := msgProto.Wrap(&bcproto.StatusRequest{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgBytes, err := proto.Marshal(msgProto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -129,7 +160,13 @@ func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error {
|
||||
}
|
||||
|
||||
func (sio *switchIO) broadcastStatusRequest() error {
|
||||
msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
|
||||
msgProto := new(bcproto.Message)
|
||||
|
||||
if err := msgProto.Wrap(&bcproto.StatusRequest{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgBytes, err := proto.Marshal(msgProto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
|
||||
"github.com/tendermint/tendermint/behaviour"
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -466,20 +468,28 @@ func (r *BlockchainReactor) Stop() error {
|
||||
func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
logger := r.logger.With("src", src.ID(), "chID", chID)
|
||||
|
||||
msg, err := bc.DecodeMsg(msgBytes)
|
||||
if err != nil {
|
||||
msgProto := new(bcproto.Message)
|
||||
|
||||
if err := proto.Unmarshal(msgBytes, msgProto); err != nil {
|
||||
logger.Error("error decoding message", "err", err)
|
||||
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
if err = bc.ValidateMsg(msg); err != nil {
|
||||
logger.Error("peer sent us invalid msg", "msg", msg, "err", err)
|
||||
if err := msgProto.Validate(); err != nil {
|
||||
logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err)
|
||||
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
r.logger.Debug("Receive", "msg", msg)
|
||||
msg, err := msgProto.Unwrap()
|
||||
if err != nil {
|
||||
logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err)
|
||||
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
r.logger.Debug("received", "msg", msg)
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *bcproto.StatusRequest:
|
||||
|
||||
@@ -9,13 +9,13 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/behaviour"
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
@@ -409,23 +409,37 @@ func TestReactorHelperMode(t *testing.T) {
|
||||
switch ev := step.event.(type) {
|
||||
case bcproto.StatusRequest:
|
||||
old := mockSwitch.numStatusResponse
|
||||
msg, err := bc.EncodeMsg(&ev)
|
||||
assert.NoError(t, err)
|
||||
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg)
|
||||
|
||||
msgProto := new(bcproto.Message)
|
||||
require.NoError(t, msgProto.Wrap(&ev))
|
||||
|
||||
msgBz, err := proto.Marshal(msgProto)
|
||||
require.NoError(t, err)
|
||||
|
||||
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz)
|
||||
assert.Equal(t, old+1, mockSwitch.numStatusResponse)
|
||||
case bcproto.BlockRequest:
|
||||
if ev.Height > params.startHeight {
|
||||
old := mockSwitch.numNoBlockResponse
|
||||
msg, err := bc.EncodeMsg(&ev)
|
||||
assert.NoError(t, err)
|
||||
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg)
|
||||
|
||||
msgProto := new(bcproto.Message)
|
||||
require.NoError(t, msgProto.Wrap(&ev))
|
||||
|
||||
msgBz, err := proto.Marshal(msgProto)
|
||||
require.NoError(t, err)
|
||||
|
||||
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz)
|
||||
assert.Equal(t, old+1, mockSwitch.numNoBlockResponse)
|
||||
} else {
|
||||
old := mockSwitch.numBlockResponse
|
||||
msg, err := bc.EncodeMsg(&ev)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, err)
|
||||
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg)
|
||||
|
||||
msgProto := new(bcproto.Message)
|
||||
require.NoError(t, msgProto.Wrap(&ev))
|
||||
|
||||
msgBz, err := proto.Marshal(msgProto)
|
||||
require.NoError(t, err)
|
||||
|
||||
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz)
|
||||
assert.Equal(t, old+1, mockSwitch.numBlockResponse)
|
||||
}
|
||||
}
|
||||
|
||||
86
node/node.go
86
node/node.go
@@ -192,7 +192,7 @@ type Node struct {
|
||||
eventBus *types.EventBus // pub/sub for services
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore // store the blockchain to disk
|
||||
bcReactor p2p.Reactor // for fast-syncing
|
||||
bcReactor service.Service // for fast-syncing
|
||||
mempoolReactor *mempl.Reactor // for gossipping transactions
|
||||
mempool mempl.Mempool
|
||||
stateSync bool // whether the node should state sync on startup
|
||||
@@ -370,24 +370,37 @@ func createEvidenceReactor(
|
||||
return evidenceReactorShim, evidenceReactor, evidencePool, nil
|
||||
}
|
||||
|
||||
func createBlockchainReactor(config *cfg.Config,
|
||||
func createBlockchainReactor(
|
||||
logger log.Logger,
|
||||
config *cfg.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore *store.BlockStore,
|
||||
csReactor *cs.Reactor,
|
||||
fastSync bool,
|
||||
logger log.Logger) (bcReactor p2p.Reactor, err error) {
|
||||
) (*p2p.ReactorShim, service.Service, error) {
|
||||
|
||||
logger = logger.With("module", "blockchain")
|
||||
|
||||
switch config.FastSync.Version {
|
||||
case "v0":
|
||||
bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
case "v2":
|
||||
bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
|
||||
}
|
||||
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
|
||||
reactor := bcv0.NewReactor(
|
||||
logger, state.Copy(), blockExec, blockStore, csReactor,
|
||||
reactorShim.GetChannel(bcv0.BlockchainChannel), reactorShim.PeerUpdates, fastSync,
|
||||
)
|
||||
|
||||
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
||||
return bcReactor, nil
|
||||
return reactorShim, reactor, nil
|
||||
|
||||
case "v2":
|
||||
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
reactor.SetLogger(logger)
|
||||
|
||||
return nil, reactor, nil
|
||||
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
|
||||
}
|
||||
}
|
||||
|
||||
func createConsensusReactor(config *cfg.Config,
|
||||
@@ -747,12 +760,28 @@ func NewNode(config *cfg.Config,
|
||||
sm.BlockExecutorWithMetrics(smMetrics),
|
||||
)
|
||||
|
||||
// Make BlockchainReactor. Don't start fast sync if we're doing a state sync first.
|
||||
bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger)
|
||||
csReactor, csState := createConsensusReactor(
|
||||
config, state, blockExec, blockStore, mempool, evPool,
|
||||
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
|
||||
)
|
||||
|
||||
// Create the blockchain reactor. Note, we do not start fast sync if we're
|
||||
// doing a state sync first.
|
||||
bcReactorShim, bcReactor, err := createBlockchainReactor(
|
||||
logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
|
||||
}
|
||||
|
||||
// TODO: Remove this once the switch is removed.
|
||||
var bcReactorForSwitch p2p.Reactor
|
||||
if bcReactorShim != nil {
|
||||
bcReactorForSwitch = bcReactorShim
|
||||
} else {
|
||||
bcReactorForSwitch = bcReactor.(p2p.Reactor)
|
||||
}
|
||||
|
||||
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first.
|
||||
// FIXME We need to update metrics here, since other reactors don't have access to them.
|
||||
if stateSync {
|
||||
@@ -761,11 +790,6 @@ func NewNode(config *cfg.Config,
|
||||
csMetrics.FastSyncing.Set(1)
|
||||
}
|
||||
|
||||
consensusReactor, consensusState := createConsensusReactor(
|
||||
config, state, blockExec, blockStore, mempool, evPool,
|
||||
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
|
||||
)
|
||||
|
||||
// Set up state sync reactor, and schedule a sync if requested.
|
||||
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
|
||||
// we should clean this whole thing up. See:
|
||||
@@ -791,8 +815,8 @@ func NewNode(config *cfg.Config,
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp)
|
||||
sw := createSwitch(
|
||||
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
|
||||
stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
|
||||
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch,
|
||||
stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
|
||||
)
|
||||
|
||||
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
|
||||
@@ -850,8 +874,8 @@ func NewNode(config *cfg.Config,
|
||||
bcReactor: bcReactor,
|
||||
mempoolReactor: mempoolReactor,
|
||||
mempool: mempool,
|
||||
consensusState: consensusState,
|
||||
consensusReactor: consensusReactor,
|
||||
consensusState: csState,
|
||||
consensusReactor: csReactor,
|
||||
stateSyncReactor: stateSyncReactor,
|
||||
stateSync: stateSync,
|
||||
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
|
||||
@@ -924,6 +948,13 @@ func (n *Node) OnStart() error {
|
||||
|
||||
n.isListening = true
|
||||
|
||||
if n.config.FastSync.Version == "v0" {
|
||||
// Start the real blockchain reactor separately since the switch uses the shim.
|
||||
if err := n.bcReactor.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Start the real state sync reactor separately since the switch uses the shim.
|
||||
if err := n.stateSyncReactor.Start(); err != nil {
|
||||
return err
|
||||
@@ -975,6 +1006,13 @@ func (n *Node) OnStop() {
|
||||
n.Logger.Error("Error closing switch", "err", err)
|
||||
}
|
||||
|
||||
if n.config.FastSync.Version == "v0" {
|
||||
// Stop the real blockchain reactor separately since the switch uses the shim.
|
||||
if err := n.bcReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop the real state sync reactor separately since the switch uses the shim.
|
||||
if err := n.stateSyncReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the state sync reactor", "err", err)
|
||||
@@ -1286,9 +1324,11 @@ func makeNodeInfo(
|
||||
var bcChannel byte
|
||||
switch config.FastSync.Version {
|
||||
case "v0":
|
||||
bcChannel = bcv0.BlockchainChannel
|
||||
bcChannel = byte(bcv0.BlockchainChannel)
|
||||
|
||||
case "v2":
|
||||
bcChannel = bcv2.BlockchainChannel
|
||||
|
||||
default:
|
||||
return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
|
||||
}
|
||||
|
||||
102
proto/tendermint/blockchain/message.go
Normal file
102
proto/tendermint/blockchain/message.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
fmt "fmt"
|
||||
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
)
|
||||
|
||||
// Wrap implements the p2p Wrapper interface and wraps a blockchain messages.
|
||||
func (m *Message) Wrap(pb proto.Message) error {
|
||||
switch msg := pb.(type) {
|
||||
case *BlockRequest:
|
||||
m.Sum = &Message_BlockRequest{BlockRequest: msg}
|
||||
|
||||
case *BlockResponse:
|
||||
m.Sum = &Message_BlockResponse{BlockResponse: msg}
|
||||
|
||||
case *NoBlockResponse:
|
||||
m.Sum = &Message_NoBlockResponse{NoBlockResponse: msg}
|
||||
|
||||
case *StatusRequest:
|
||||
m.Sum = &Message_StatusRequest{StatusRequest: msg}
|
||||
|
||||
case *StatusResponse:
|
||||
m.Sum = &Message_StatusResponse{StatusResponse: msg}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped blockchain
|
||||
// message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_BlockRequest:
|
||||
return m.GetBlockRequest(), nil
|
||||
|
||||
case *Message_BlockResponse:
|
||||
return m.GetBlockResponse(), nil
|
||||
|
||||
case *Message_NoBlockResponse:
|
||||
return m.GetNoBlockResponse(), nil
|
||||
|
||||
case *Message_StatusRequest:
|
||||
return m.GetStatusRequest(), nil
|
||||
|
||||
case *Message_StatusResponse:
|
||||
return m.GetStatusResponse(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate validates the message returning an error upon failure.
|
||||
func (m *Message) Validate() error {
|
||||
if m == nil {
|
||||
return errors.New("message cannot be nil")
|
||||
}
|
||||
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_BlockRequest:
|
||||
if m.GetBlockRequest().Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
|
||||
case *Message_BlockResponse:
|
||||
// validate basic is called later when converting from proto
|
||||
return nil
|
||||
|
||||
case *Message_NoBlockResponse:
|
||||
if m.GetNoBlockResponse().Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
|
||||
case *Message_StatusResponse:
|
||||
if m.GetStatusResponse().Base < 0 {
|
||||
return errors.New("negative Base")
|
||||
}
|
||||
if m.GetStatusResponse().Height < 0 {
|
||||
return errors.New("negative Height")
|
||||
}
|
||||
if m.GetStatusResponse().Base > m.GetStatusResponse().Height {
|
||||
return fmt.Errorf(
|
||||
"base %v cannot be greater than height %v",
|
||||
m.GetStatusResponse().Base, m.GetStatusResponse().Height,
|
||||
)
|
||||
}
|
||||
|
||||
case *Message_StatusRequest:
|
||||
return nil
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown message type: %T", msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,19 +1,18 @@
|
||||
package blockchain
|
||||
package blockchain_test
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"math"
|
||||
math "math"
|
||||
"testing"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func TestBcBlockRequestMessageValidateBasic(t *testing.T) {
|
||||
func TestBlockRequest_Validate(t *testing.T) {
|
||||
testCases := []struct {
|
||||
testName string
|
||||
requestHeight int64
|
||||
@@ -27,13 +26,15 @@ func TestBcBlockRequestMessageValidateBasic(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.testName, func(t *testing.T) {
|
||||
request := bcproto.BlockRequest{Height: tc.requestHeight}
|
||||
assert.Equal(t, tc.expectErr, ValidateMsg(&request) != nil, "Validate Basic had an unexpected result")
|
||||
msg := &bcproto.Message{}
|
||||
require.NoError(t, msg.Wrap(&bcproto.BlockRequest{Height: tc.requestHeight}))
|
||||
|
||||
require.Equal(t, tc.expectErr, msg.Validate() != nil)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBcNoBlockResponseMessageValidateBasic(t *testing.T) {
|
||||
func TestNoBlockResponse_Validate(t *testing.T) {
|
||||
testCases := []struct {
|
||||
testName string
|
||||
nonResponseHeight int64
|
||||
@@ -47,18 +48,21 @@ func TestBcNoBlockResponseMessageValidateBasic(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.testName, func(t *testing.T) {
|
||||
nonResponse := bcproto.NoBlockResponse{Height: tc.nonResponseHeight}
|
||||
assert.Equal(t, tc.expectErr, ValidateMsg(&nonResponse) != nil, "Validate Basic had an unexpected result")
|
||||
msg := &bcproto.Message{}
|
||||
require.NoError(t, msg.Wrap(&bcproto.NoBlockResponse{Height: tc.nonResponseHeight}))
|
||||
|
||||
require.Equal(t, tc.expectErr, msg.Validate() != nil)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBcStatusRequestMessageValidateBasic(t *testing.T) {
|
||||
request := bcproto.StatusRequest{}
|
||||
assert.NoError(t, ValidateMsg(&request))
|
||||
func TestStatusRequest_Validate(t *testing.T) {
|
||||
msg := &bcproto.Message{}
|
||||
require.NoError(t, msg.Wrap(&bcproto.StatusRequest{}))
|
||||
require.NoError(t, msg.Validate())
|
||||
}
|
||||
|
||||
func TestBcStatusResponseMessageValidateBasic(t *testing.T) {
|
||||
func TestStatusResponse_Validate(t *testing.T) {
|
||||
testCases := []struct {
|
||||
testName string
|
||||
responseHeight int64
|
||||
@@ -72,13 +76,15 @@ func TestBcStatusResponseMessageValidateBasic(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.testName, func(t *testing.T) {
|
||||
response := bcproto.StatusResponse{Height: tc.responseHeight}
|
||||
assert.Equal(t, tc.expectErr, ValidateMsg(&response) != nil, "Validate Basic had an unexpected result")
|
||||
msg := &bcproto.Message{}
|
||||
require.NoError(t, msg.Wrap(&bcproto.StatusResponse{Height: tc.responseHeight}))
|
||||
|
||||
require.Equal(t, tc.expectErr, msg.Validate() != nil)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// nolint:lll // ignore line length in tests
|
||||
// nolint:lll
|
||||
func TestBlockchainMessageVectors(t *testing.T) {
|
||||
block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil)
|
||||
block.Version.Block = 11 // overwrite updated protocol version
|
||||
@@ -117,8 +123,8 @@ func TestBlockchainMessageVectors(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.testName, func(t *testing.T) {
|
||||
bz, _ := proto.Marshal(tc.bmsg)
|
||||
|
||||
bz, err := proto.Marshal(tc.bmsg)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expBytes, hex.EncodeToString(bz))
|
||||
})
|
||||
}
|
||||
@@ -8,8 +8,8 @@ import (
|
||||
)
|
||||
|
||||
// Wrap implements the p2p Wrapper interface and wraps a state sync messages.
|
||||
func (m *Message) Wrap(msg proto.Message) error {
|
||||
switch msg := msg.(type) {
|
||||
func (m *Message) Wrap(pb proto.Message) error {
|
||||
switch msg := pb.(type) {
|
||||
case *ChunkRequest:
|
||||
m.Sum = &Message_ChunkRequest{ChunkRequest: msg}
|
||||
|
||||
|
||||
@@ -349,7 +349,7 @@ func (r *Reactor) processSnapshotCh() {
|
||||
select {
|
||||
case envelope := <-r.snapshotCh.In():
|
||||
if err := r.handleMessage(r.snapshotCh.ID(), envelope); err != nil {
|
||||
r.Logger.Error("failed to process envelope", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err)
|
||||
r.Logger.Error("failed to process message", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err)
|
||||
r.snapshotCh.Error() <- p2p.PeerError{
|
||||
PeerID: envelope.From,
|
||||
Err: err,
|
||||
@@ -376,7 +376,7 @@ func (r *Reactor) processChunkCh() {
|
||||
select {
|
||||
case envelope := <-r.chunkCh.In():
|
||||
if err := r.handleMessage(r.chunkCh.ID(), envelope); err != nil {
|
||||
r.Logger.Error("failed to process envelope", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err)
|
||||
r.Logger.Error("failed to process message", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err)
|
||||
r.chunkCh.Error() <- p2p.PeerError{
|
||||
PeerID: envelope.From,
|
||||
Err: err,
|
||||
|
||||
@@ -233,7 +233,7 @@ type Node struct {
|
||||
eventBus *types.EventBus // pub/sub for services
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore // store the blockchain to disk
|
||||
bcReactor p2p.Reactor // for fast-syncing
|
||||
bcReactor service.Service // for fast-syncing
|
||||
mempoolReactor *mempl.Reactor // for gossipping transactions
|
||||
mempool mempl.Mempool
|
||||
stateSync bool // whether the node should state sync on startup
|
||||
@@ -411,24 +411,37 @@ func createEvidenceReactor(
|
||||
return evidenceReactorShim, evidenceReactor, evidencePool, nil
|
||||
}
|
||||
|
||||
func createBlockchainReactor(config *cfg.Config,
|
||||
func createBlockchainReactor(
|
||||
logger log.Logger,
|
||||
config *cfg.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore *store.BlockStore,
|
||||
csReactor *cs.Reactor,
|
||||
fastSync bool,
|
||||
logger log.Logger) (bcReactor p2p.Reactor, err error) {
|
||||
) (*p2p.ReactorShim, service.Service, error) {
|
||||
|
||||
logger = logger.With("module", "blockchain")
|
||||
|
||||
switch config.FastSync.Version {
|
||||
case "v0":
|
||||
bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
case "v2":
|
||||
bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
|
||||
}
|
||||
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
|
||||
reactor := bcv0.NewReactor(
|
||||
logger, state.Copy(), blockExec, blockStore, csReactor,
|
||||
reactorShim.GetChannel(bcv0.BlockchainChannel), reactorShim.PeerUpdates, fastSync,
|
||||
)
|
||||
|
||||
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
||||
return bcReactor, nil
|
||||
return reactorShim, reactor, nil
|
||||
|
||||
case "v2":
|
||||
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
reactor.SetLogger(logger)
|
||||
|
||||
return nil, reactor, nil
|
||||
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
|
||||
}
|
||||
}
|
||||
|
||||
func createConsensusReactor(config *cfg.Config,
|
||||
@@ -780,12 +793,29 @@ func NewNode(config *cfg.Config,
|
||||
sm.BlockExecutorWithMetrics(smMetrics),
|
||||
)
|
||||
|
||||
// Make BlockchainReactor. Don't start fast sync if we're doing a state sync first.
|
||||
bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger)
|
||||
logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors)
|
||||
csReactor, csState := createConsensusReactor(
|
||||
config, state, blockExec, blockStore, mempool, evPool,
|
||||
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors,
|
||||
)
|
||||
|
||||
// Create the blockchain reactor. Note, we do not start fast sync if we're
|
||||
// doing a state sync first.
|
||||
bcReactorShim, bcReactor, err := createBlockchainReactor(
|
||||
logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
|
||||
}
|
||||
|
||||
// TODO: Remove this once the switch is removed.
|
||||
var bcReactorForSwitch p2p.Reactor
|
||||
if bcReactorShim != nil {
|
||||
bcReactorForSwitch = bcReactorShim
|
||||
} else {
|
||||
bcReactorForSwitch = bcReactor.(p2p.Reactor)
|
||||
}
|
||||
|
||||
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first.
|
||||
// FIXME We need to update metrics here, since other reactors don't have access to them.
|
||||
if stateSync {
|
||||
@@ -794,11 +824,6 @@ func NewNode(config *cfg.Config,
|
||||
csMetrics.FastSyncing.Set(1)
|
||||
}
|
||||
|
||||
logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors)
|
||||
consensusReactor, consensusState := createConsensusReactor(
|
||||
config, state, blockExec, blockStore, mempool, evPool,
|
||||
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors)
|
||||
|
||||
// Set up state sync reactor, and schedule a sync if requested.
|
||||
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
|
||||
// we should clean this whole thing up. See:
|
||||
@@ -824,8 +849,8 @@ func NewNode(config *cfg.Config,
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp)
|
||||
sw := createSwitch(
|
||||
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
|
||||
stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
|
||||
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch,
|
||||
stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
|
||||
)
|
||||
|
||||
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
|
||||
@@ -883,8 +908,8 @@ func NewNode(config *cfg.Config,
|
||||
bcReactor: bcReactor,
|
||||
mempoolReactor: mempoolReactor,
|
||||
mempool: mempool,
|
||||
consensusState: consensusState,
|
||||
consensusReactor: consensusReactor,
|
||||
consensusState: csState,
|
||||
consensusReactor: csReactor,
|
||||
stateSyncReactor: stateSyncReactor,
|
||||
stateSync: stateSync,
|
||||
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
|
||||
@@ -957,6 +982,13 @@ func (n *Node) OnStart() error {
|
||||
|
||||
n.isListening = true
|
||||
|
||||
if n.config.FastSync.Version == "v0" {
|
||||
// Start the real blockchain reactor separately since the switch uses the shim.
|
||||
if err := n.bcReactor.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Start the real state sync reactor separately since the switch uses the shim.
|
||||
if err := n.stateSyncReactor.Start(); err != nil {
|
||||
return err
|
||||
@@ -1008,6 +1040,13 @@ func (n *Node) OnStop() {
|
||||
n.Logger.Error("Error closing switch", "err", err)
|
||||
}
|
||||
|
||||
if n.config.FastSync.Version == "v0" {
|
||||
// Stop the real blockchain reactor separately since the switch uses the shim.
|
||||
if err := n.bcReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop the real state sync reactor separately since the switch uses the shim.
|
||||
if err := n.stateSyncReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the state sync reactor", "err", err)
|
||||
@@ -1317,9 +1356,11 @@ func makeNodeInfo(
|
||||
var bcChannel byte
|
||||
switch config.FastSync.Version {
|
||||
case "v0":
|
||||
bcChannel = bcv0.BlockchainChannel
|
||||
bcChannel = byte(bcv0.BlockchainChannel)
|
||||
|
||||
case "v2":
|
||||
bcChannel = bcv2.BlockchainChannel
|
||||
|
||||
default:
|
||||
return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user