blockchain v0: initial commit

This commit is contained in:
Aleksandr Bezobchuk
2021-01-04 14:41:01 -05:00
parent 1570d26f84
commit b48a7f4d7e
2 changed files with 199 additions and 84 deletions

View File

@@ -7,6 +7,7 @@ import (
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
sm "github.com/tendermint/tendermint/state"
@@ -14,18 +15,39 @@ import (
"github.com/tendermint/tendermint/types"
)
var (
_ service.Service = (*Reactor)(nil)
// ChannelShims contains a map of ChannelDescriptorShim objects, where each
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
// p2p proto.Message the new p2p Channel is responsible for handling.
//
//
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
BlockchainChannel: {
MsgType: new(bcproto.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(BlockchainChannel),
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: bc.MaxMsgSize,
},
},
}
)
const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
// BlockchainChannel is a channel for blocks and status updates
BlockchainChannel = p2p.ChannelID(0x40)
trySyncIntervalMS = 10
// stop syncing when last block's time is
// within this much of the system time.
// stopSyncingDurationMinutes = 10
// ask for best height every 10s
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1
@@ -49,8 +71,8 @@ func (e peerError) Error() string {
}
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
type Reactor struct {
service.BaseService
// immutable
initialState sm.State
@@ -60,61 +82,198 @@ type BlockchainReactor struct {
pool *BlockPool
fastSync bool
blockchainCh *p2p.Channel
peerUpdates *p2p.PeerUpdatesCh
closeCh chan struct{}
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
}
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
fastSync bool) *BlockchainReactor {
// NewReactor returns new reactor instance.
func NewReactor(
logger log.Logger,
state sm.State,
blockExec *sm.BlockExecutor,
store *store.BlockStore,
blockchainCh *p2p.Channel,
peerUpdates *p2p.PeerUpdatesCh,
fastSync bool,
) *Reactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
startHeight := store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
pool := NewBlockPool(startHeight, requestsCh, errorsCh)
bcR := &BlockchainReactor{
requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, 1000) // NOTE: The capacity should be larger than the peer count.
r := &Reactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
fastSync: fastSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
blockchainCh: blockchainCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
r.BaseService = *service.NewBaseService(logger, "BlockchainReactor", r)
return r
}
// SetLogger implements service.Service by setting the logger on reactor and pool.
func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
bcR.BaseService.Logger = l
bcR.pool.Logger = l
}
// OnStart implements service.Service.
func (bcR *BlockchainReactor) OnStart() error {
if bcR.fastSync {
err := bcR.pool.Start()
if err != nil {
// OnStart starts separate go routines for each p2p Channel and listens for
// envelopes on each. In addition, it also listens for peer updates and handles
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed.
//
// If fastSync is enabled, we also start the pool and the pool processing
// goroutine. If the pool fails to start, an error is returned.
func (r *Reactor) OnStart() error {
if r.fastSync {
if err := r.pool.Start(); err != nil {
return err
}
go bcR.poolRoutine(false)
go r.poolRoutine(false)
}
go r.processBlockchainCh()
go r.processPeerUpdates()
return nil
}
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
func (r *Reactor) OnStop() {
if r.fastSync {
if err := r.pool.Stop(); err != nil {
r.Logger.Error("failed to stop pool", "err", err)
}
}
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
// Wait for all p2p Channels to be closed before returning. This ensures we
// can easily reason about synchronization of all p2p Channels and ensure no
// panics will occur.
<-r.blockchainCh.Done()
<-r.peerUpdates.Done()
}
func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
panic("IMPLEMENT ME!")
}
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
r.Logger.Error("recovering from processing message panic", "err", err)
}
}()
switch chID {
case BlockchainChannel:
err = r.handleBlockchainMessage(envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
}
return err
}
// processBlockchainCh initiates a blocking process where we listen for and handle
// envelopes on the BlockchainChannel. Any error encountered during message
// execution will result in a PeerError being sent on the BlockchainChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processBlockchainCh() {
defer r.blockchainCh.Close()
for {
select {
case envelope := <-r.blockchainCh.In():
if err := r.handleMessage(r.blockchainCh.ID(), envelope); err != nil {
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
}
}
case <-r.closeCh:
r.Logger.Debug("stopped listening on blockchain channel; closing...")
return
}
}
}
// processPeerUpdate processes a PeerUpdate, returning an error upon failing to
// handle the PeerUpdate or if a panic is recovered.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing peer update: %v", e)
r.Logger.Error("recovering from processing peer update panic", "err", err)
}
}()
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
switch peerUpdate.Status {
case p2p.PeerStatusNew, p2p.PeerStatusUp:
r.blockchainCh.Out() <- p2p.Envelope{
To: peerUpdate.PeerID,
Message: &bcproto.StatusResponse{
Base: r.store.Base(),
Height: r.store.Height(),
},
}
case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
r.pool.RemovePeer(peerUpdate.PeerID)
}
return err
}
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates() {
defer r.peerUpdates.Close()
for {
select {
case peerUpdate := <-r.peerUpdates.Updates():
_ = r.processPeerUpdate(peerUpdate)
case <-r.closeCh:
r.Logger.Debug("stopped listening on peer updates channel; closing...")
return
}
}
}
// ============================================================================
// ============================================================================
// ============================================================================
// SwitchToFastSync is called by the state sync reactor when switching to fast sync.
func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error {
bcR.fastSync = true
@@ -129,50 +288,6 @@ func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error {
return nil
}
// OnStop implements service.Service.
func (bcR *BlockchainReactor) OnStop() {
if bcR.fastSync {
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
}
}
// GetChannels implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: bc.MaxMsgSize,
},
}
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
_ = peer.Send(BlockchainChannel, msgBytes)
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first
// bcStatusResponseMessage from the peer and call pool.SetPeerRange
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
bcR.pool.RemovePeer(peer.ID())
}
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
@@ -261,7 +376,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) {
func (r *Reactor) poolRoutine(stateSynced bool) {
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
statusUpdateTicker = time.NewTicker(statusUpdateIntervalSeconds * time.Second)

View File

@@ -341,7 +341,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
// processSnapshotCh initiates a blocking process where we listen for and handle
// envelopes on the SnapshotChannel. Any error encountered during message
// execution will result in a PeerError being sent on the SnapshotChannel. When
// the reactor is stopped, we will catch the singal and close the p2p Channel
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processSnapshotCh() {
defer r.snapshotCh.Close()
@@ -367,7 +367,7 @@ func (r *Reactor) processSnapshotCh() {
// processChunkCh initiates a blocking process where we listen for and handle
// envelopes on the ChunkChannel. Any error encountered during message
// execution will result in a PeerError being sent on the ChunkChannel. When
// the reactor is stopped, we will catch the singal and close the p2p Channel
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processChunkCh() {
defer r.chunkCh.Close()
@@ -419,7 +419,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) {
}
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the singal and
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates() {
defer r.peerUpdates.Close()