diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 770963793..d257b6f1b 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -7,6 +7,7 @@ import ( bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/p2p" bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" sm "github.com/tendermint/tendermint/state" @@ -14,18 +15,39 @@ import ( "github.com/tendermint/tendermint/types" ) +var ( + _ service.Service = (*Reactor)(nil) + + // ChannelShims contains a map of ChannelDescriptorShim objects, where each + // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding + // p2p proto.Message the new p2p Channel is responsible for handling. + // + // + // TODO: Remove once p2p refactor is complete. + // ref: https://github.com/tendermint/tendermint/issues/5670 + ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ + BlockchainChannel: { + MsgType: new(bcproto.Message), + Descriptor: &p2p.ChannelDescriptor{ + ID: byte(BlockchainChannel), + Priority: 5, + SendQueueCapacity: 1000, + RecvBufferCapacity: 50 * 4096, + RecvMessageCapacity: bc.MaxMsgSize, + }, + }, + } +) + const ( - // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) - BlockchainChannel = byte(0x40) + // BlockchainChannel is a channel for blocks and status updates + BlockchainChannel = p2p.ChannelID(0x40) trySyncIntervalMS = 10 - // stop syncing when last block's time is - // within this much of the system time. - // stopSyncingDurationMinutes = 10 - // ask for best height every 10s statusUpdateIntervalSeconds = 10 + // check if we should switch to consensus reactor switchToConsensusIntervalSeconds = 1 @@ -49,8 +71,8 @@ func (e peerError) Error() string { } // BlockchainReactor handles long-term catchup syncing. -type BlockchainReactor struct { - p2p.BaseReactor +type Reactor struct { + service.BaseService // immutable initialState sm.State @@ -60,61 +82,198 @@ type BlockchainReactor struct { pool *BlockPool fastSync bool + blockchainCh *p2p.Channel + peerUpdates *p2p.PeerUpdatesCh + closeCh chan struct{} + requestsCh <-chan BlockRequest errorsCh <-chan peerError } -// NewBlockchainReactor returns new reactor instance. -func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, - fastSync bool) *BlockchainReactor { - +// NewReactor returns new reactor instance. +func NewReactor( + logger log.Logger, + state sm.State, + blockExec *sm.BlockExecutor, + store *store.BlockStore, + blockchainCh *p2p.Channel, + peerUpdates *p2p.PeerUpdatesCh, + fastSync bool, +) *Reactor { if state.LastBlockHeight != store.Height() { - panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, - store.Height())) + panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) } - requestsCh := make(chan BlockRequest, maxTotalRequesters) - - const capacity = 1000 // must be bigger than peers count - errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock - startHeight := store.Height() + 1 if startHeight == 1 { startHeight = state.InitialHeight } - pool := NewBlockPool(startHeight, requestsCh, errorsCh) - bcR := &BlockchainReactor{ + requestsCh := make(chan BlockRequest, maxTotalRequesters) + errorsCh := make(chan peerError, 1000) // NOTE: The capacity should be larger than the peer count. + + r := &Reactor{ initialState: state, blockExec: blockExec, store: store, - pool: pool, + pool: NewBlockPool(startHeight, requestsCh, errorsCh), fastSync: fastSync, requestsCh: requestsCh, errorsCh: errorsCh, + blockchainCh: blockchainCh, + peerUpdates: peerUpdates, + closeCh: make(chan struct{}), } - bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) - return bcR + + r.BaseService = *service.NewBaseService(logger, "BlockchainReactor", r) + return r } -// SetLogger implements service.Service by setting the logger on reactor and pool. -func (bcR *BlockchainReactor) SetLogger(l log.Logger) { - bcR.BaseService.Logger = l - bcR.pool.Logger = l -} - -// OnStart implements service.Service. -func (bcR *BlockchainReactor) OnStart() error { - if bcR.fastSync { - err := bcR.pool.Start() - if err != nil { +// OnStart starts separate go routines for each p2p Channel and listens for +// envelopes on each. In addition, it also listens for peer updates and handles +// messages on that p2p channel accordingly. The caller must be sure to execute +// OnStop to ensure the outbound p2p Channels are closed. +// +// If fastSync is enabled, we also start the pool and the pool processing +// goroutine. If the pool fails to start, an error is returned. +func (r *Reactor) OnStart() error { + if r.fastSync { + if err := r.pool.Start(); err != nil { return err } - go bcR.poolRoutine(false) + + go r.poolRoutine(false) } + + go r.processBlockchainCh() + go r.processPeerUpdates() + return nil } +// OnStop stops the reactor by signaling to all spawned goroutines to exit and +// blocking until they all exit. +func (r *Reactor) OnStop() { + if r.fastSync { + if err := r.pool.Stop(); err != nil { + r.Logger.Error("failed to stop pool", "err", err) + } + } + + // Close closeCh to signal to all spawned goroutines to gracefully exit. All + // p2p Channels should execute Close(). + close(r.closeCh) + + // Wait for all p2p Channels to be closed before returning. This ensures we + // can easily reason about synchronization of all p2p Channels and ensure no + // panics will occur. + <-r.blockchainCh.Done() + <-r.peerUpdates.Done() +} + +func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error { + panic("IMPLEMENT ME!") +} + +// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. +// It will handle errors and any possible panics gracefully. A caller can handle +// any error returned by sending a PeerError on the respective channel. +func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("panic in processing message: %v", e) + r.Logger.Error("recovering from processing message panic", "err", err) + } + }() + + switch chID { + case BlockchainChannel: + err = r.handleBlockchainMessage(envelope) + + default: + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + } + + return err +} + +// processBlockchainCh initiates a blocking process where we listen for and handle +// envelopes on the BlockchainChannel. Any error encountered during message +// execution will result in a PeerError being sent on the BlockchainChannel. When +// the reactor is stopped, we will catch the signal and close the p2p Channel +// gracefully. +func (r *Reactor) processBlockchainCh() { + defer r.blockchainCh.Close() + + for { + select { + case envelope := <-r.blockchainCh.In(): + if err := r.handleMessage(r.blockchainCh.ID(), envelope); err != nil { + r.blockchainCh.Error() <- p2p.PeerError{ + PeerID: envelope.From, + Err: err, + Severity: p2p.PeerErrorSeverityLow, + } + } + + case <-r.closeCh: + r.Logger.Debug("stopped listening on blockchain channel; closing...") + return + } + } +} + +// processPeerUpdate processes a PeerUpdate, returning an error upon failing to +// handle the PeerUpdate or if a panic is recovered. +func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("panic in processing peer update: %v", e) + r.Logger.Error("recovering from processing peer update panic", "err", err) + } + }() + + r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status) + + switch peerUpdate.Status { + case p2p.PeerStatusNew, p2p.PeerStatusUp: + r.blockchainCh.Out() <- p2p.Envelope{ + To: peerUpdate.PeerID, + Message: &bcproto.StatusResponse{ + Base: r.store.Base(), + Height: r.store.Height(), + }, + } + + case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned: + r.pool.RemovePeer(peerUpdate.PeerID) + } + + return err +} + +// processPeerUpdates initiates a blocking process where we listen for and handle +// PeerUpdate messages. When the reactor is stopped, we will catch the signal and +// close the p2p PeerUpdatesCh gracefully. +func (r *Reactor) processPeerUpdates() { + defer r.peerUpdates.Close() + + for { + select { + case peerUpdate := <-r.peerUpdates.Updates(): + _ = r.processPeerUpdate(peerUpdate) + + case <-r.closeCh: + r.Logger.Debug("stopped listening on peer updates channel; closing...") + return + } + } +} + +// ============================================================================ +// ============================================================================ +// ============================================================================ + // SwitchToFastSync is called by the state sync reactor when switching to fast sync. func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error { bcR.fastSync = true @@ -129,50 +288,6 @@ func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error { return nil } -// OnStop implements service.Service. -func (bcR *BlockchainReactor) OnStop() { - if bcR.fastSync { - if err := bcR.pool.Stop(); err != nil { - bcR.Logger.Error("Error stopping pool", "err", err) - } - } -} - -// GetChannels implements Reactor -func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { - return []*p2p.ChannelDescriptor{ - { - ID: BlockchainChannel, - Priority: 5, - SendQueueCapacity: 1000, - RecvBufferCapacity: 50 * 4096, - RecvMessageCapacity: bc.MaxMsgSize, - }, - } -} - -// AddPeer implements Reactor by sending our state to peer. -func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{ - Base: bcR.store.Base(), - Height: bcR.store.Height()}) - if err != nil { - bcR.Logger.Error("could not convert msg to protobuf", "err", err) - return - } - - _ = peer.Send(BlockchainChannel, msgBytes) - // it's OK if send fails. will try later in poolRoutine - - // peer is added to the pool once we receive the first - // bcStatusResponseMessage from the peer and call pool.SetPeerRange -} - -// RemovePeer implements Reactor by removing peer from the pool. -func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { - bcR.pool.RemovePeer(peer.ID()) -} - // respondToPeer loads a block and sends it to the requesting peer, // if we have it. Otherwise, we'll respond saying we don't have it. func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest, @@ -261,7 +376,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) { +func (r *Reactor) poolRoutine(stateSynced bool) { var ( trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond) statusUpdateTicker = time.NewTicker(statusUpdateIntervalSeconds * time.Second) diff --git a/statesync/reactor.go b/statesync/reactor.go index 071d4e534..e7a120d55 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -341,7 +341,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err // processSnapshotCh initiates a blocking process where we listen for and handle // envelopes on the SnapshotChannel. Any error encountered during message // execution will result in a PeerError being sent on the SnapshotChannel. When -// the reactor is stopped, we will catch the singal and close the p2p Channel +// the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. func (r *Reactor) processSnapshotCh() { defer r.snapshotCh.Close() @@ -367,7 +367,7 @@ func (r *Reactor) processSnapshotCh() { // processChunkCh initiates a blocking process where we listen for and handle // envelopes on the ChunkChannel. Any error encountered during message // execution will result in a PeerError being sent on the ChunkChannel. When -// the reactor is stopped, we will catch the singal and close the p2p Channel +// the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. func (r *Reactor) processChunkCh() { defer r.chunkCh.Close() @@ -419,7 +419,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) { } // processPeerUpdates initiates a blocking process where we listen for and handle -// PeerUpdate messages. When the reactor is stopped, we will catch the singal and +// PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. func (r *Reactor) processPeerUpdates() { defer r.peerUpdates.Close()