diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go new file mode 100644 index 000000000..c9982aafe --- /dev/null +++ b/blockchain/v1/reactor.go @@ -0,0 +1,550 @@ +package v1 + +import ( + "fmt" + "time" + + "github.com/tendermint/tendermint/behaviour" + bc "github.com/tendermint/tendermint/blockchain" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" +) + +const ( + // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) + BlockchainChannel = byte(0x40) + trySyncIntervalMS = 10 + trySendIntervalMS = 10 + + // ask for best height every 10s + statusUpdateIntervalSeconds = 10 +) + +var ( + // Maximum number of requests that can be pending per peer, i.e. for which requests have been sent but blocks + // have not been received. + maxRequestsPerPeer = 20 + // Maximum number of block requests for the reactor, pending or for which blocks have been received. + maxNumRequests = 64 +) + +type consensusReactor interface { + // for when we switch from blockchain reactor and fast sync to + // the consensus machine + SwitchToConsensus(state sm.State, skipWAL bool) +} + +// BlockchainReactor handles long-term catchup syncing. +type BlockchainReactor struct { + p2p.BaseReactor + + initialState sm.State // immutable + state sm.State + + blockExec *sm.BlockExecutor + store *store.BlockStore + + fastSync bool + stateSynced bool + + fsm *BcReactorFSM + blocksSynced uint64 + + // Receive goroutine forwards messages to this channel to be processed in the context of the poolRoutine. + messagesForFSMCh chan bcReactorMessage + + // Switch goroutine may send RemovePeer to the blockchain reactor. This is an error message that is relayed + // to this channel to be processed in the context of the poolRoutine. + errorsForFSMCh chan bcReactorMessage + + // This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and + // the switch. + eventsFromFSMCh chan bcFsmMessage + + swReporter *behaviour.SwitchReporter +} + +// NewBlockchainReactor returns new reactor instance. +func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, + fastSync bool) *BlockchainReactor { + + if state.LastBlockHeight != store.Height() { + panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, + store.Height())) + } + + const capacity = 1000 + eventsFromFSMCh := make(chan bcFsmMessage, capacity) + messagesForFSMCh := make(chan bcReactorMessage, capacity) + errorsForFSMCh := make(chan bcReactorMessage, capacity) + + startHeight := store.Height() + 1 + if startHeight == 1 { + startHeight = state.InitialHeight + } + bcR := &BlockchainReactor{ + initialState: state, + state: state, + blockExec: blockExec, + fastSync: fastSync, + store: store, + messagesForFSMCh: messagesForFSMCh, + eventsFromFSMCh: eventsFromFSMCh, + errorsForFSMCh: errorsForFSMCh, + } + fsm := NewFSM(startHeight, bcR) + bcR.fsm = fsm + bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) + // bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch) + + return bcR +} + +// bcReactorMessage is used by the reactor to send messages to the FSM. +type bcReactorMessage struct { + event bReactorEvent + data bReactorEventData +} + +type bFsmEvent uint + +const ( + // message type events + peerErrorEv = iota + 1 + syncFinishedEv +) + +type bFsmEventData struct { + peerID p2p.ID + err error +} + +// bcFsmMessage is used by the FSM to send messages to the reactor +type bcFsmMessage struct { + event bFsmEvent + data bFsmEventData +} + +// SetLogger implements service.Service by setting the logger on reactor and pool. +func (bcR *BlockchainReactor) SetLogger(l log.Logger) { + bcR.BaseService.Logger = l + bcR.fsm.SetLogger(l) +} + +// OnStart implements service.Service. +func (bcR *BlockchainReactor) OnStart() error { + bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch) + if bcR.fastSync { + go bcR.poolRoutine() + } + return nil +} + +// OnStop implements service.Service. +func (bcR *BlockchainReactor) OnStop() { + _ = bcR.Stop() +} + +// SwitchToFastSync is called by the state sync reactor when switching to fast sync. +func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error { + bcR.fastSync = true + bcR.initialState = state + bcR.state = state + bcR.stateSynced = true + + bcR.fsm = NewFSM(state.LastBlockHeight+1, bcR) + bcR.fsm.SetLogger(bcR.Logger) + go bcR.poolRoutine() + return nil +} + +// GetChannels implements Reactor +func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + { + ID: BlockchainChannel, + Priority: 10, + SendQueueCapacity: 2000, + RecvBufferCapacity: 50 * 4096, + RecvMessageCapacity: bc.MaxMsgSize, + }, + } +} + +// AddPeer implements Reactor by sending our state to peer. +func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { + peer.NewSend(p2p.Envelope{ + ChannelID: BlockchainChannel, + Message: &bcproto.StatusResponse{ + Base: bcR.store.Base(), + Height: bcR.store.Height(), + }, + }) + // it's OK if send fails. will try later in poolRoutine + + // peer is added to the pool once we receive the first + // bcStatusResponseMessage from the peer and call pool.updatePeer() +} + +// sendBlockToPeer loads a block and sends it to the requesting peer. +// If the block doesn't exist a bcNoBlockResponseMessage is sent. +// If all nodes are honest, no node should be requesting for a block that doesn't exist. +func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcproto.BlockRequest, + src p2p.Peer) (queued bool) { + + block := bcR.store.LoadBlock(msg.Height) + if block != nil { + pbbi, err := block.ToProto() + if err != nil { + bcR.Logger.Error("Could not send block message to peer", "err", err) + return false + } + return src.NewTrySend(p2p.Envelope{ + ChannelID: BlockchainChannel, + Message: &bcproto.BlockResponse{Block: pbbi}, + }) + } + + bcR.Logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height) + + return src.NewTrySend(p2p.Envelope{ + ChannelID: BlockchainChannel, + Message: &bcproto.NoBlockResponse{Height: msg.Height}, + }) +} + +func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcproto.StatusRequest, src p2p.Peer) (queued bool) { + return src.NewTrySend(p2p.Envelope{ + ChannelID: BlockchainChannel, + Message: &bcproto.StatusResponse{ + Base: bcR.store.Base(), + Height: bcR.store.Height(), + }, + }) +} + +// RemovePeer implements Reactor by removing peer from the pool. +func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + msgData := bcReactorMessage{ + event: peerRemoveEv, + data: bReactorEventData{ + peerID: peer.ID(), + err: errSwitchRemovesPeer, + }, + } + bcR.errorsForFSMCh <- msgData +} + +// Receive implements Reactor by handling 4 types of messages (look below). +func (bcR *BlockchainReactor) NewReceive(e p2p.Envelope) { + if err := bc.ValidateMsg(e.Message); err != nil { + bcR.Logger.Error("peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err) + _ = bcR.swReporter.Report(behaviour.BadMessage(e.Src.ID(), err.Error())) + return + } + + bcR.Logger.Debug("Receive", "src", e.Src, "chID", e.ChannelID, "msg", e.Message) + + switch msg := e.Message.(type) { + case *bcproto.BlockRequest: + if queued := bcR.sendBlockToPeer(msg, e.Src); !queued { + // Unfortunately not queued since the queue is full. + bcR.Logger.Error("Could not send block message to peer", "src", e.Src, "height", msg.Height) + } + + case *bcproto.StatusRequest: + // Send peer our state. + if queued := bcR.sendStatusResponseToPeer(msg, e.Src); !queued { + // Unfortunately not queued since the queue is full. + bcR.Logger.Error("Could not send status message to peer", "src", e.Src) + } + + case *bcproto.BlockResponse: + bi, err := types.BlockFromProto(msg.Block) + if err != nil { + bcR.Logger.Error("error transition block from protobuf", "err", err) + return + } + msgForFSM := bcReactorMessage{ + event: blockResponseEv, + data: bReactorEventData{ + peerID: e.Src.ID(), + height: bi.Height, + block: bi, + }, + } + bcR.Logger.Info("Received", "src", e.Src, "height", bi.Height) + bcR.messagesForFSMCh <- msgForFSM + case *bcproto.NoBlockResponse: + msgForFSM := bcReactorMessage{ + event: noBlockResponseEv, + data: bReactorEventData{ + peerID: e.Src.ID(), + height: msg.Height, + }, + } + bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height) + bcR.messagesForFSMCh <- msgForFSM + + case *bcproto.StatusResponse: + // Got a peer status. Unverified. + msgForFSM := bcReactorMessage{ + event: statusResponseEv, + data: bReactorEventData{ + peerID: e.Src.ID(), + height: msg.Height, + }, + } + bcR.messagesForFSMCh <- msgForFSM + + default: + bcR.Logger.Error(fmt.Sprintf("unknown message type %T", msg)) + } +} + +func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + +// processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel +func (bcR *BlockchainReactor) processBlocksRoutine(stopProcessing chan struct{}) { + + processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + doProcessBlockCh := make(chan struct{}, 1) + + lastHundred := time.Now() + lastRate := 0.0 + +ForLoop: + for { + select { + case <-stopProcessing: + bcR.Logger.Info("finishing block execution") + break ForLoop + case <-processReceivedBlockTicker.C: // try to execute blocks + select { + case doProcessBlockCh <- struct{}{}: + default: + } + case <-doProcessBlockCh: + for { + err := bcR.processBlock() + if err == errMissingBlock { + break + } + // Notify FSM of block processing result. + msgForFSM := bcReactorMessage{ + event: processedBlockEv, + data: bReactorEventData{ + err: err, + }, + } + _ = bcR.fsm.Handle(&msgForFSM) + + if err != nil { + break + } + + bcR.blocksSynced++ + if bcR.blocksSynced%100 == 0 { + lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + height, maxPeerHeight := bcR.fsm.Status() + bcR.Logger.Info("Fast Sync Rate", "height", height, + "max_peer_height", maxPeerHeight, "blocks/s", lastRate) + lastHundred = time.Now() + } + } + } + } +} + +// poolRoutine receives and handles messages from the Receive() routine and from the FSM. +func (bcR *BlockchainReactor) poolRoutine() { + + bcR.fsm.Start() + + sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond) + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + + stopProcessing := make(chan struct{}, 1) + go bcR.processBlocksRoutine(stopProcessing) + +ForLoop: + for { + select { + + case <-sendBlockRequestTicker.C: + if !bcR.fsm.NeedsBlocks() { + continue + } + _ = bcR.fsm.Handle(&bcReactorMessage{ + event: makeRequestsEv, + data: bReactorEventData{ + maxNumRequests: maxNumRequests}}) + + case <-statusUpdateTicker.C: + // Ask for status updates. + go bcR.sendStatusRequest() + + case msg := <-bcR.messagesForFSMCh: + // Sent from the Receive() routine when status (statusResponseEv) and + // block (blockResponseEv) response events are received + _ = bcR.fsm.Handle(&msg) + + case msg := <-bcR.errorsForFSMCh: + // Sent from the switch.RemovePeer() routine (RemovePeerEv) and + // FSM state timer expiry routine (stateTimeoutEv). + _ = bcR.fsm.Handle(&msg) + + case msg := <-bcR.eventsFromFSMCh: + switch msg.event { + case syncFinishedEv: + stopProcessing <- struct{}{} + // Sent from the FSM when it enters finished state. + break ForLoop + case peerErrorEv: + // Sent from the FSM when it detects peer error + bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID) + if msg.data.err == errNoPeerResponse { + // Sent from the peer timeout handler routine + _ = bcR.fsm.Handle(&bcReactorMessage{ + event: peerRemoveEv, + data: bReactorEventData{ + peerID: msg.data.peerID, + err: msg.data.err, + }, + }) + } + // else { + // For slow peers, or errors due to blocks received from wrong peer + // the FSM had already removed the peers + // } + default: + bcR.Logger.Error("Event from FSM not supported", "type", msg.event) + } + + case <-bcR.Quit(): + break ForLoop + } + } +} + +func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) { + peer := bcR.Switch.Peers().Get(peerID) + if peer != nil { + _ = bcR.swReporter.Report(behaviour.BadMessage(peerID, err.Error())) + } +} + +func (bcR *BlockchainReactor) processBlock() error { + + first, second, err := bcR.fsm.FirstTwoBlocks() + if err != nil { + // We need both to sync the first block. + return err + } + + chainID := bcR.initialState.ChainID + + firstParts := first.MakePartSet(types.BlockPartSizeBytes) + firstPartSetHeader := firstParts.Header() + firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader} + // Finally, verify the first block using the second's commit + // NOTE: we can probably make this more efficient, but note that calling + // first.Hash() doesn't verify the tx contents, so MakePartSet() is + // currently necessary. + err = bcR.state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit) + if err != nil { + bcR.Logger.Error("error during commit verification", "err", err, + "first", first.Height, "second", second.Height) + return errBlockVerificationFailure + } + + bcR.store.SaveBlock(first, firstParts, second.LastCommit) + + bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first) + if err != nil { + panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) + } + + return nil +} + +// Implements bcRNotifier +// sendStatusRequest broadcasts `BlockStore` height. +func (bcR *BlockchainReactor) sendStatusRequest() { + bcR.Switch.NewBroadcast(p2p.Envelope{ + ChannelID: BlockchainChannel, + Message: &bcproto.StatusRequest{}, + }) +} + +// Implements bcRNotifier +// BlockRequest sends `BlockRequest` height. +func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error { + peer := bcR.Switch.Peers().Get(peerID) + if peer == nil { + return errNilPeerForBlockRequest + } + + queued := peer.NewTrySend(p2p.Envelope{ + ChannelID: BlockchainChannel, + Message: &bcproto.BlockRequest{Height: height}, + }) + if !queued { + return errSendQueueFull + } + return nil +} + +// Implements bcRNotifier +func (bcR *BlockchainReactor) switchToConsensus() { + conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) + if ok { + conR.SwitchToConsensus(bcR.state, bcR.blocksSynced > 0 || bcR.stateSynced) + bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv} + } + // else { + // Should only happen during testing. + // } +} + +// Implements bcRNotifier +// Called by FSM and pool: +// - pool calls when it detects slow peer or when peer times out +// - FSM calls when: +// - adding a block (addBlock) fails +// - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks +func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { + bcR.Logger.Info("sendPeerError:", "peer", peerID, "error", err) + msgData := bcFsmMessage{ + event: peerErrorEv, + data: bFsmEventData{ + peerID: peerID, + err: err, + }, + } + bcR.eventsFromFSMCh <- msgData +} + +// Implements bcRNotifier +func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { + if timer == nil { + panic("nil timer pointer parameter") + } + if *timer == nil { + *timer = time.AfterFunc(timeout, func() { + msg := bcReactorMessage{ + event: stateTimeoutEv, + data: bReactorEventData{ + stateName: name, + }, + } + bcR.errorsForFSMCh <- msg + }) + } else { + (*timer).Reset(timeout) + } +} diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go new file mode 100644 index 000000000..fceec1c02 --- /dev/null +++ b/blockchain/v2/reactor.go @@ -0,0 +1,554 @@ +package v2 + +import ( + "errors" + "fmt" + "time" + + "github.com/tendermint/tendermint/behaviour" + bc "github.com/tendermint/tendermint/blockchain" + "github.com/tendermint/tendermint/libs/log" + tmsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/p2p" + bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +const ( + // chBufferSize is the buffer size of all event channels. + chBufferSize int = 1000 +) + +type blockStore interface { + LoadBlock(height int64) *types.Block + SaveBlock(*types.Block, *types.PartSet, *types.Commit) + Base() int64 + Height() int64 +} + +// BlockchainReactor handles fast sync protocol. +type BlockchainReactor struct { + p2p.BaseReactor + + fastSync bool // if true, enable fast sync on start + stateSynced bool // set to true when SwitchToFastSync is called by state sync + scheduler *Routine + processor *Routine + logger log.Logger + + mtx tmsync.RWMutex + maxPeerHeight int64 + syncHeight int64 + events chan Event // non-nil during a fast sync + + reporter behaviour.Reporter + io iIO + store blockStore +} + +//nolint:unused,deadcode +type blockVerifier interface { + VerifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error +} + +type blockApplier interface { + ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, int64, error) +} + +// XXX: unify naming in this package around tmState +func newReactor(state state.State, store blockStore, reporter behaviour.Reporter, + blockApplier blockApplier, fastSync bool) *BlockchainReactor { + initHeight := state.LastBlockHeight + 1 + if initHeight == 1 { + initHeight = state.InitialHeight + } + scheduler := newScheduler(initHeight, time.Now()) + pContext := newProcessorContext(store, blockApplier, state) + // TODO: Fix naming to just newProcesssor + // newPcState requires a processorContext + processor := newPcState(pContext) + + return &BlockchainReactor{ + scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize), + processor: newRoutine("processor", processor.handle, chBufferSize), + store: store, + reporter: reporter, + logger: log.NewNopLogger(), + fastSync: fastSync, + } +} + +// NewBlockchainReactor creates a new reactor instance. +func NewBlockchainReactor( + state state.State, + blockApplier blockApplier, + store blockStore, + fastSync bool) *BlockchainReactor { + reporter := behaviour.NewMockReporter() + return newReactor(state, store, reporter, blockApplier, fastSync) +} + +// SetSwitch implements Reactor interface. +func (r *BlockchainReactor) SetSwitch(sw *p2p.Switch) { + r.Switch = sw + if sw != nil { + r.io = newSwitchIo(sw) + } else { + r.io = nil + } +} + +func (r *BlockchainReactor) setMaxPeerHeight(height int64) { + r.mtx.Lock() + defer r.mtx.Unlock() + if height > r.maxPeerHeight { + r.maxPeerHeight = height + } +} + +func (r *BlockchainReactor) setSyncHeight(height int64) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.syncHeight = height +} + +// SyncHeight returns the height to which the BlockchainReactor has synced. +func (r *BlockchainReactor) SyncHeight() int64 { + r.mtx.RLock() + defer r.mtx.RUnlock() + return r.syncHeight +} + +// SetLogger sets the logger of the reactor. +func (r *BlockchainReactor) SetLogger(logger log.Logger) { + r.logger = logger + r.scheduler.setLogger(logger) + r.processor.setLogger(logger) +} + +// Start implements cmn.Service interface +func (r *BlockchainReactor) Start() error { + r.reporter = behaviour.NewSwitchReporter(r.BaseReactor.Switch) + if r.fastSync { + err := r.startSync(nil) + if err != nil { + return fmt.Errorf("failed to start fast sync: %w", err) + } + } + return nil +} + +// startSync begins a fast sync, signalled by r.events being non-nil. If state is non-nil, +// the scheduler and processor is updated with this state on startup. +func (r *BlockchainReactor) startSync(state *state.State) error { + r.mtx.Lock() + defer r.mtx.Unlock() + if r.events != nil { + return errors.New("fast sync already in progress") + } + r.events = make(chan Event, chBufferSize) + go r.scheduler.start() + go r.processor.start() + if state != nil { + <-r.scheduler.ready() + <-r.processor.ready() + r.scheduler.send(bcResetState{state: *state}) + r.processor.send(bcResetState{state: *state}) + } + go r.demux(r.events) + return nil +} + +// endSync ends a fast sync +func (r *BlockchainReactor) endSync() { + r.mtx.Lock() + defer r.mtx.Unlock() + if r.events != nil { + close(r.events) + } + r.events = nil + r.scheduler.stop() + r.processor.stop() +} + +// SwitchToFastSync is called by the state sync reactor when switching to fast sync. +func (r *BlockchainReactor) SwitchToFastSync(state state.State) error { + r.stateSynced = true + state = state.Copy() + return r.startSync(&state) +} + +// reactor generated ticker events: +// ticker for cleaning peers +type rTryPrunePeer struct { + priorityHigh + time time.Time +} + +func (e rTryPrunePeer) String() string { + return fmt.Sprintf("rTryPrunePeer{%v}", e.time) +} + +// ticker event for scheduling block requests +type rTrySchedule struct { + priorityHigh + time time.Time +} + +func (e rTrySchedule) String() string { + return fmt.Sprintf("rTrySchedule{%v}", e.time) +} + +// ticker for block processing +type rProcessBlock struct { + priorityNormal +} + +func (e rProcessBlock) String() string { + return "rProcessBlock" +} + +// reactor generated events based on blockchain related messages from peers: +// blockResponse message received from a peer +type bcBlockResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + block *types.Block +} + +func (resp bcBlockResponse) String() string { + return fmt.Sprintf("bcBlockResponse{%d#%X from %v at %v}", + resp.block.Height, resp.block.Hash(), resp.peerID, resp.time) +} + +// blockNoResponse message received from a peer +type bcNoBlockResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + height int64 +} + +func (resp bcNoBlockResponse) String() string { + return fmt.Sprintf("bcNoBlockResponse{%v has no block at height %d at %v}", + resp.peerID, resp.height, resp.time) +} + +// statusResponse message received from a peer +type bcStatusResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + base int64 + height int64 +} + +func (resp bcStatusResponse) String() string { + return fmt.Sprintf("bcStatusResponse{%v is at height %d (base: %d) at %v}", + resp.peerID, resp.height, resp.base, resp.time) +} + +// new peer is connected +type bcAddNewPeer struct { + priorityNormal + peerID p2p.ID +} + +func (resp bcAddNewPeer) String() string { + return fmt.Sprintf("bcAddNewPeer{%v}", resp.peerID) +} + +// existing peer is removed +type bcRemovePeer struct { + priorityHigh + peerID p2p.ID + reason interface{} +} + +func (resp bcRemovePeer) String() string { + return fmt.Sprintf("bcRemovePeer{%v due to %v}", resp.peerID, resp.reason) +} + +// resets the scheduler and processor state, e.g. following a switch from state syncing +type bcResetState struct { + priorityHigh + state state.State +} + +func (e bcResetState) String() string { + return fmt.Sprintf("bcResetState{%v}", e.state) +} + +// Takes the channel as a parameter to avoid race conditions on r.events. +func (r *BlockchainReactor) demux(events <-chan Event) { + var lastRate = 0.0 + var lastHundred = time.Now() + + var ( + processBlockFreq = 20 * time.Millisecond + doProcessBlockCh = make(chan struct{}, 1) + doProcessBlockTk = time.NewTicker(processBlockFreq) + ) + defer doProcessBlockTk.Stop() + + var ( + prunePeerFreq = 1 * time.Second + doPrunePeerCh = make(chan struct{}, 1) + doPrunePeerTk = time.NewTicker(prunePeerFreq) + ) + defer doPrunePeerTk.Stop() + + var ( + scheduleFreq = 20 * time.Millisecond + doScheduleCh = make(chan struct{}, 1) + doScheduleTk = time.NewTicker(scheduleFreq) + ) + defer doScheduleTk.Stop() + + var ( + statusFreq = 10 * time.Second + doStatusCh = make(chan struct{}, 1) + doStatusTk = time.NewTicker(statusFreq) + ) + defer doStatusTk.Stop() + doStatusCh <- struct{}{} // immediately broadcast to get status of existing peers + + // XXX: Extract timers to make testing atemporal + for { + select { + // Pacers: send at most per frequency but don't saturate + case <-doProcessBlockTk.C: + select { + case doProcessBlockCh <- struct{}{}: + default: + } + case <-doPrunePeerTk.C: + select { + case doPrunePeerCh <- struct{}{}: + default: + } + case <-doScheduleTk.C: + select { + case doScheduleCh <- struct{}{}: + default: + } + case <-doStatusTk.C: + select { + case doStatusCh <- struct{}{}: + default: + } + + // Tickers: perform tasks periodically + case <-doScheduleCh: + r.scheduler.send(rTrySchedule{time: time.Now()}) + case <-doPrunePeerCh: + r.scheduler.send(rTryPrunePeer{time: time.Now()}) + case <-doProcessBlockCh: + r.processor.send(rProcessBlock{}) + case <-doStatusCh: + r.io.broadcastStatusRequest() + + // Events from peers. Closing the channel signals event loop termination. + case event, ok := <-events: + if !ok { + r.logger.Info("Stopping event processing") + return + } + switch event := event.(type) { + case bcStatusResponse: + r.setMaxPeerHeight(event.height) + r.scheduler.send(event) + case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse: + r.scheduler.send(event) + default: + r.logger.Error("Received unexpected event", "event", fmt.Sprintf("%T", event)) + } + + // Incremental events from scheduler + case event := <-r.scheduler.next(): + switch event := event.(type) { + case scBlockReceived: + r.processor.send(event) + case scPeerError: + r.processor.send(event) + if err := r.reporter.Report(behaviour.BadMessage(event.peerID, "scPeerError")); err != nil { + r.logger.Error("Error reporting peer", "err", err) + } + case scBlockRequest: + if err := r.io.sendBlockRequest(event.peerID, event.height); err != nil { + r.logger.Error("Error sending block request", "err", err) + } + case scFinishedEv: + r.processor.send(event) + r.scheduler.stop() + case scSchedulerFail: + r.logger.Error("Scheduler failure", "err", event.reason.Error()) + case scPeersPruned: + // Remove peers from the processor. + for _, peerID := range event.peers { + r.processor.send(scPeerError{peerID: peerID, reason: errors.New("peer was pruned")}) + } + r.logger.Debug("Pruned peers", "count", len(event.peers)) + case noOpEvent: + default: + r.logger.Error("Received unexpected scheduler event", "event", fmt.Sprintf("%T", event)) + } + + // Incremental events from processor + case event := <-r.processor.next(): + switch event := event.(type) { + case pcBlockProcessed: + r.setSyncHeight(event.height) + if r.syncHeight%100 == 0 { + lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + r.logger.Info("Fast Sync Rate", "height", r.syncHeight, + "max_peer_height", r.maxPeerHeight, "blocks/s", lastRate) + lastHundred = time.Now() + } + r.scheduler.send(event) + case pcBlockVerificationFailure: + r.scheduler.send(event) + case pcFinished: + r.logger.Info("Fast sync complete, switching to consensus") + if !r.io.trySwitchToConsensus(event.tmState, event.blocksSynced > 0 || r.stateSynced) { + r.logger.Error("Failed to switch to consensus reactor") + } + r.endSync() + return + case noOpEvent: + default: + r.logger.Error("Received unexpected processor event", "event", fmt.Sprintf("%T", event)) + } + + // Terminal event from scheduler + case err := <-r.scheduler.final(): + switch err { + case nil: + r.logger.Info("Scheduler stopped") + default: + r.logger.Error("Scheduler aborted with error", "err", err) + } + + // Terminal event from processor + case err := <-r.processor.final(): + switch err { + case nil: + r.logger.Info("Processor stopped") + default: + r.logger.Error("Processor aborted with error", "err", err) + } + } + } +} + +// Stop implements cmn.Service interface. +func (r *BlockchainReactor) Stop() error { + r.logger.Info("reactor stopping") + r.endSync() + r.logger.Info("reactor stopped") + return nil +} + +// Receive implements Reactor by handling different message types. +func (r *BlockchainReactor) NewReceive(e p2p.Envelope) { + if err := bc.ValidateMsg(e.Message); err != nil { + r.logger.Error("peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err) + _ = r.reporter.Report(behaviour.BadMessage(e.Src.ID(), err.Error())) + return + } + + r.logger.Debug("Receive", "src", e.Src.ID(), "chID", e.ChannelID, "msg", e.Message) + + switch msg := e.Message.(type) { + case *bcproto.StatusRequest: + if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), e.Src.ID()); err != nil { + r.logger.Error("Could not send status message to peer", "src", e.Src) + } + + case *bcproto.BlockRequest: + block := r.store.LoadBlock(msg.Height) + if block != nil { + if err := r.io.sendBlockToPeer(block, e.Src.ID()); err != nil { + r.logger.Error("Could not send block message to peer: ", err) + } + } else { + r.logger.Info("peer asking for a block we don't have", "src", e.Src, "height", msg.Height) + peerID := e.Src.ID() + if err := r.io.sendBlockNotFound(msg.Height, peerID); err != nil { + r.logger.Error("Couldn't send block not found: ", err) + } + } + + case *bcproto.StatusResponse: + r.mtx.RLock() + if r.events != nil { + r.events <- bcStatusResponse{peerID: e.Src.ID(), base: msg.Base, height: msg.Height} + } + r.mtx.RUnlock() + + case *bcproto.BlockResponse: + bi, err := types.BlockFromProto(msg.Block) + if err != nil { + r.logger.Error("error transitioning block from protobuf", "err", err) + return + } + r.mtx.RLock() + if r.events != nil { + r.events <- bcBlockResponse{ + peerID: e.Src.ID(), + block: bi, + time: time.Now(), + } + } + r.mtx.RUnlock() + + case *bcproto.NoBlockResponse: + r.mtx.RLock() + if r.events != nil { + r.events <- bcNoBlockResponse{peerID: e.Src.ID(), height: msg.Height, time: time.Now()} + } + r.mtx.RUnlock() + } +} + +func (r *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + +// AddPeer implements Reactor interface +func (r *BlockchainReactor) AddPeer(peer p2p.Peer) { + err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), peer.ID()) + if err != nil { + r.logger.Error("Could not send status message to peer new", "src", peer.ID, "height", r.SyncHeight()) + } + r.mtx.RLock() + defer r.mtx.RUnlock() + if r.events != nil { + r.events <- bcAddNewPeer{peerID: peer.ID()} + } +} + +// RemovePeer implements Reactor interface. +func (r *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + r.mtx.RLock() + defer r.mtx.RUnlock() + if r.events != nil { + r.events <- bcRemovePeer{ + peerID: peer.ID(), + reason: reason, + } + } +} + +// GetChannels implements Reactor +func (r *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + { + ID: BlockchainChannel, + Priority: 5, + SendQueueCapacity: 2000, + RecvBufferCapacity: 50 * 4096, + RecvMessageCapacity: bc.MaxMsgSize, + }, + } +} diff --git a/blocksync/reactor.go b/blocksync/reactor.go index b1efd08a5..f3d14f06c 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -233,6 +233,8 @@ func (bcR *Reactor) NewReceive(e p2p.Envelope) { } } +func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! func (bcR *Reactor) poolRoutine(stateSynced bool) { diff --git a/consensus/reactor.go b/consensus/reactor.go index 1a40df5c3..40d71236b 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -387,6 +387,8 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { } } +func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + // SetEventBus sets event bus. func (conR *Reactor) SetEventBus(b *types.EventBus) { conR.eventBus = b diff --git a/evidence/reactor.go b/evidence/reactor.go index dd00377fd..8d7cef24b 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -92,6 +92,8 @@ func (evR *Reactor) NewReceive(e p2p.Envelope) { } } +func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + // SetEventBus implements events.Eventable. func (evR *Reactor) SetEventBus(b *types.EventBus) { evR.eventBus = b diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index e02a84724..c412375f2 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -187,6 +187,8 @@ func (memR *Reactor) NewReceive(e p2p.Envelope) { // broadcasting happens from go routines per peer } +func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + // PeerState describes the state of a peer. type PeerState interface { GetHeight() int64 diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index cbcf57404..80e995e56 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -82,5 +82,7 @@ func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} func (*BaseReactor) NewReceive(e Envelope) {} + +func (*BaseReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}} func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } diff --git a/p2p/mock/reactor.go b/p2p/mock/reactor.go index 6c042b513..7e4d608d2 100644 --- a/p2p/mock/reactor.go +++ b/p2p/mock/reactor.go @@ -23,3 +23,5 @@ func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Chann func (r *Reactor) AddPeer(peer p2p.Peer) {} func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {} func (r *Reactor) NewReceive(e p2p.Envelope) {} + +func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}} diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index b17c08607..926da8633 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -298,6 +298,8 @@ func (r *Reactor) NewReceive(e p2p.Envelope) { } } +func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + // enforces a minimum amount of time between requests func (r *Reactor) receiveRequest(src Peer) error { id := string(src.ID()) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 28bcbf890..9b95d7cfb 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cosmos/gogoproto/proto" + "github.com/ethereum/go-ethereum/p2p" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -81,6 +82,8 @@ func (tr *TestReactor) NewReceive(e Envelope) { } } +func (tr *TestReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + func (tr *TestReactor) getMsgs(chID byte) []PeerMessage { tr.mtx.Lock() defer tr.mtx.Unlock() diff --git a/statesync/reactor.go b/statesync/reactor.go index 4549bc552..7b80b3b34 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -223,6 +223,8 @@ func (r *Reactor) NewReceive(e p2p.Envelope) { } } +func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + // recentSnapshots fetches the n most recent snapshots from the app func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) { resp, err := r.conn.ListSnapshotsSync(abci.RequestListSnapshots{}) diff --git a/test/maverick/consensus/reactor.go b/test/maverick/consensus/reactor.go new file mode 100644 index 000000000..789b478cf --- /dev/null +++ b/test/maverick/consensus/reactor.go @@ -0,0 +1,1470 @@ +package consensus + +import ( + "errors" + "fmt" + "reflect" + "sync" + "time" + + "github.com/gogo/protobuf/proto" + + tmcon "github.com/tendermint/tendermint/consensus" + cstypes "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/libs/bits" + tmevents "github.com/tendermint/tendermint/libs/events" + tmjson "github.com/tendermint/tendermint/libs/json" + "github.com/tendermint/tendermint/libs/log" + tmsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/p2p" + tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" + tmtime "github.com/tendermint/tendermint/types/time" +) + +const ( + StateChannel = byte(0x20) + DataChannel = byte(0x21) + VoteChannel = byte(0x22) + VoteSetBitsChannel = byte(0x23) + + maxMsgSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes. + + blocksToContributeToBecomeGoodPeer = 10000 + votesToContributeToBecomeGoodPeer = 10000 +) + +//----------------------------------------------------------------------------- + +// Reactor defines a reactor for the consensus service. +type Reactor struct { + p2p.BaseReactor // BaseService + p2p.Switch + + conS *State + + mtx tmsync.RWMutex + waitSync bool + eventBus *types.EventBus + + Metrics *tmcon.Metrics +} + +type ReactorOption func(*Reactor) + +// NewReactor returns a new Reactor with the given +// consensusState. +func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor { + conR := &Reactor{ + conS: consensusState, + waitSync: waitSync, + Metrics: tmcon.NopMetrics(), + } + conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) + + for _, option := range options { + option(conR) + } + + return conR +} + +// OnStart implements BaseService by subscribing to events, which later will be +// broadcasted to other peers and starting state if we're not in fast sync. +func (conR *Reactor) OnStart() error { + conR.Logger.Info("Reactor ", "waitSync", conR.WaitSync()) + + // start routine that computes peer statistics for evaluating peer quality + go conR.peerStatsRoutine() + + conR.subscribeToBroadcastEvents() + + if !conR.WaitSync() { + conR.conS.SetSwitch(conR.Switch) + err := conR.conS.Start() + if err != nil { + return err + } + } + + return nil +} + +// OnStop implements BaseService by unsubscribing from events and stopping +// state. +func (conR *Reactor) OnStop() { + conR.unsubscribeFromBroadcastEvents() + if err := conR.conS.Stop(); err != nil { + conR.Logger.Error("Error stopping consensus state", "err", err) + } + if !conR.WaitSync() { + conR.conS.Wait() + } +} + +// SwitchToConsensus switches from fast_sync mode to consensus mode. +// It resets the state, turns off fast_sync, and starts the consensus state-machine +func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) { + conR.Logger.Info("SwitchToConsensus") + + // We have no votes, so reconstruct LastCommit from SeenCommit. + if state.LastBlockHeight > 0 { + conR.conS.reconstructLastCommit(state) + } + + // NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a + // NewRoundStepMessage. + conR.conS.updateToState(state) + + conR.mtx.Lock() + conR.waitSync = false + conR.mtx.Unlock() + conR.Metrics.FastSyncing.Set(0) + conR.Metrics.StateSyncing.Set(0) + + if skipWAL { + conR.conS.doWALCatchup = false + } + conR.conS.SetSwitch(conR.Switch) + err := conR.conS.Start() + if err != nil { + panic(fmt.Sprintf(`Failed to start consensus state: %v + +conS: +%+v + +conR: +%+v`, err, conR.conS, conR)) + } +} + +// GetChannels implements Reactor +func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor { + // TODO optimize + return []*p2p.ChannelDescriptor{ + { + ID: StateChannel, + Priority: 6, + SendQueueCapacity: 100, + RecvMessageCapacity: maxMsgSize, + }, + { + ID: DataChannel, // maybe split between gossiping current block and catchup stuff + // once we gossip the whole block there's nothing left to send until next height or round + Priority: 10, + SendQueueCapacity: 100, + RecvBufferCapacity: 50 * 4096, + RecvMessageCapacity: maxMsgSize, + }, + { + ID: VoteChannel, + Priority: 7, + SendQueueCapacity: 100, + RecvBufferCapacity: 100 * 100, + RecvMessageCapacity: maxMsgSize, + }, + { + ID: VoteSetBitsChannel, + Priority: 1, + SendQueueCapacity: 2, + RecvBufferCapacity: 1024, + RecvMessageCapacity: maxMsgSize, + }, + } +} + +// InitPeer implements Reactor by creating a state for the peer. +func (conR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { + peerState := NewPeerState(peer).SetLogger(conR.Logger) + peer.Set(types.PeerStateKey, peerState) + return peer +} + +// AddPeer implements Reactor by spawning multiple gossiping goroutines for the +// peer. +func (conR *Reactor) AddPeer(peer p2p.Peer) { + if !conR.IsRunning() { + return + } + + peerState, ok := peer.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("peer %v has no state", peer)) + } + // Begin routines for this peer. + go conR.gossipDataRoutine(peer, peerState) + go conR.gossipVotesRoutine(peer, peerState) + go conR.queryMaj23Routine(peer, peerState) + + // Send our state to peer. + // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). + if !conR.WaitSync() { + conR.sendNewRoundStepMessage(peer) + } +} + +// RemovePeer is a noop. +func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { + if !conR.IsRunning() { + return + } + // TODO + // ps, ok := peer.Get(PeerStateKey).(*PeerState) + // if !ok { + // panic(fmt.Sprintf("Peer %v has no state", peer)) + // } + // ps.Disconnect() +} + +// Receive implements Reactor +// NOTE: We process these messages even when we're fast_syncing. +// Messages affect either a peer state or the consensus state. +// Peer state updates can happen in parallel, but processing of +// proposals, block parts, and votes are ordered by the receiveRoutine +// NOTE: blocks on consensus state for proposals, block parts, and votes +func (conR *Reactor) NewReceive(e p2p.Envelope) { + if !conR.IsRunning() { + conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID) + return + } + msg, err := tmcon.MsgFromProto(e.Message) + if err != nil { + conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) + conR.Switch.StopPeerForError(e.Src, err) + return + } + + if err := msg.ValidateBasic(); err != nil { + conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err) + conR.Switch.StopPeerForError(e.Src, err) + return + } + + conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) + + // Get peer states + ps, ok := e.Src.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", e.Src)) + } + + switch e.ChannelID { + case StateChannel: + switch msg := msg.(type) { + case *tmcon.NewRoundStepMessage: + conR.conS.mtx.Lock() + initialHeight := conR.conS.state.InitialHeight + conR.conS.mtx.Unlock() + if err = msg.ValidateHeight(initialHeight); err != nil { + conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err) + conR.Switch.StopPeerForError(e.Src, err) + return + } + ps.ApplyNewRoundStepMessage(msg) + case *tmcon.NewValidBlockMessage: + ps.ApplyNewValidBlockMessage(msg) + case *tmcon.HasVoteMessage: + ps.ApplyHasVoteMessage(msg) + case *tmcon.VoteSetMaj23Message: + cs := conR.conS + cs.mtx.Lock() + height, votes := cs.Height, cs.Votes + cs.mtx.Unlock() + if height != msg.Height { + return + } + // Peer claims to have a maj23 for some BlockID at H,R,S, + err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID) + if err != nil { + conR.Switch.StopPeerForError(e.Src, err) + return + } + // Respond with a VoteSetBitsMessage showing which votes we have. + // (and consequently shows which we don't have) + var ourVotes *bits.BitArray + switch msg.Type { + case tmproto.PrevoteType: + ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) + case tmproto.PrecommitType: + ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) + default: + panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") + } + m := &tmcons.VoteSetBits{ + Height: msg.Height, + Round: msg.Round, + Type: msg.Type, + BlockID: msg.BlockID.ToProto(), + } + v := ourVotes.ToProto() + if v != nil { + m.Votes = *v + } + + e.Src.NewTrySend(p2p.Envelope{ + ChannelID: VoteSetBitsChannel, + Message: m, + }) + default: + conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } + + case DataChannel: + if conR.WaitSync() { + conR.Logger.Info("Ignoring message received during sync", "msg", msg) + return + } + switch msg := msg.(type) { + case *tmcon.ProposalMessage: + ps.SetHasProposal(msg.Proposal) + conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()} + case *tmcon.ProposalPOLMessage: + ps.ApplyProposalPOLMessage(msg) + case *tmcon.BlockPartMessage: + ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) + conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) + conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()} + default: + conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } + + case VoteChannel: + if conR.WaitSync() { + conR.Logger.Info("Ignoring message received during sync", "msg", msg) + return + } + switch msg := msg.(type) { + case *tmcon.VoteMessage: + cs := conR.conS + cs.mtx.RLock() + height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() + cs.mtx.RUnlock() + ps.EnsureVoteBitArrays(height, valSize) + ps.EnsureVoteBitArrays(height-1, lastCommitSize) + ps.SetHasVote(msg.Vote) + + cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()} + + default: + // don't punish (leave room for soft upgrades) + conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } + + case VoteSetBitsChannel: + if conR.WaitSync() { + conR.Logger.Info("Ignoring message received during sync", "msg", msg) + return + } + switch msg := msg.(type) { + case *tmcon.VoteSetBitsMessage: + cs := conR.conS + cs.mtx.Lock() + height, votes := cs.Height, cs.Votes + cs.mtx.Unlock() + + if height == msg.Height { + var ourVotes *bits.BitArray + switch msg.Type { + case tmproto.PrevoteType: + ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) + case tmproto.PrecommitType: + ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) + default: + panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") + } + ps.ApplyVoteSetBitsMessage(msg, ourVotes) + } else { + ps.ApplyVoteSetBitsMessage(msg, nil) + } + default: + // don't punish (leave room for soft upgrades) + conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } + + default: + conR.Logger.Error(fmt.Sprintf("Unknown chId %X", e.ChannelID)) + } +} + +func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} + +// SetEventBus sets event bus. +func (conR *Reactor) SetEventBus(b *types.EventBus) { + conR.eventBus = b + conR.conS.SetEventBus(b) +} + +// WaitSync returns whether the consensus reactor is waiting for state/fast sync. +func (conR *Reactor) WaitSync() bool { + conR.mtx.RLock() + defer conR.mtx.RUnlock() + return conR.waitSync +} + +//-------------------------------------- + +// subscribeToBroadcastEvents subscribes for new round steps and votes +// using internal pubsub defined on state to broadcast +// them to peers upon receiving. +func (conR *Reactor) subscribeToBroadcastEvents() { + const subscriber = "consensus-reactor" + if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep, + func(data tmevents.EventData) { + conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) + }); err != nil { + conR.Logger.Error("Error adding listener for events", "err", err) + } + + if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventValidBlock, + func(data tmevents.EventData) { + conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState)) + }); err != nil { + conR.Logger.Error("Error adding listener for events", "err", err) + } + + if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote, + func(data tmevents.EventData) { + conR.broadcastHasVoteMessage(data.(*types.Vote)) + }); err != nil { + conR.Logger.Error("Error adding listener for events", "err", err) + } + +} + +func (conR *Reactor) unsubscribeFromBroadcastEvents() { + const subscriber = "consensus-reactor" + conR.conS.evsw.RemoveListener(subscriber) +} + +func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) { + conR.Switch.NewBroadcast(p2p.Envelope{ + ChannelID: StateChannel, + Message: &tmcons.NewRoundStep{ + Height: rs.Height, + Round: rs.Round, + Step: uint32(rs.Step), + SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()), + LastCommitRound: rs.LastCommit.GetRound(), + }, + }) +} + +func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { + psh := rs.ProposalBlockParts.Header() + conR.Switch.NewBroadcast(p2p.Envelope{ + ChannelID: StateChannel, + Message: &tmcons.NewValidBlock{ + Height: rs.Height, + Round: rs.Round, + BlockPartSetHeader: psh.ToProto(), + BlockParts: rs.ProposalBlockParts.BitArray().ToProto(), + IsCommit: rs.Step == cstypes.RoundStepCommit, + }, + }) +} + +// Broadcasts HasVoteMessage to peers that care. +func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { + conR.Switch.NewBroadcast(p2p.Envelope{ + ChannelID: StateChannel, + Message: &tmcons.HasVote{ + Height: vote.Height, + Round: vote.Round, + Type: vote.Type, + Index: vote.ValidatorIndex, + }, + }) + /* + // TODO: Make this broadcast more selective. + for _, peer := range conR.Switch.Peers().List() { + ps, ok := peer.Get(PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", peer)) + } + prs := ps.GetRoundState() + if prs.Height == vote.Height { + // TODO: Also filter on round? + peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg}) + } else { + // Height doesn't match + // TODO: check a field, maybe CatchupCommitRound? + // TODO: But that requires changing the struct field comment. + } + } + */ +} + +func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) { + rs := conR.conS.GetRoundState() + peer.NewTrySend(p2p.Envelope{ + ChannelID: StateChannel, + Message: &tmcons.NewRoundStep{ + Height: rs.Height, + Round: rs.Round, + Step: uint32(rs.Step), + SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()), + LastCommitRound: rs.LastCommit.GetRound(), + }, + }) +} + +func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) { + logger := conR.Logger.With("peer", peer) + +OUTER_LOOP: + for { + // Manage disconnects from self or peer. + if !peer.IsRunning() || !conR.IsRunning() { + logger.Info("Stopping gossipDataRoutine for peer") + return + } + rs := conR.conS.GetRoundState() + prs := ps.GetRoundState() + + // Send proposal Block parts? + if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) { + if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { + part := rs.ProposalBlockParts.GetPart(index) + logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) + p, err := part.ToProto() + if err != nil { + panic(err) + } + if peer.NewTrySend(p2p.Envelope{ + ChannelID: DataChannel, + Message: &tmcons.BlockPart{ + Height: rs.Height, // This tells peer that this part applies to us. + Round: rs.Round, // This tells peer that this part applies to us. + Part: *p, + }, + }) { + ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + } + continue OUTER_LOOP + } + } + + // If the peer is on a previous height that we have, help catch up. + if (0 < prs.Height) && (prs.Height < rs.Height) && (prs.Height >= conR.conS.blockStore.Base()) { + heightLogger := logger.With("height", prs.Height) + + // if we never received the commit message from the peer, the block parts wont be initialized + if prs.ProposalBlockParts == nil { + blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) + if blockMeta == nil { + heightLogger.Error("Failed to load block meta", + "blockstoreBase", conR.conS.blockStore.Base(), "blockstoreHeight", conR.conS.blockStore.Height()) + time.Sleep(conR.conS.config.PeerGossipSleepDuration) + } else { + ps.InitProposalBlockParts(blockMeta.BlockID.PartSetHeader) + } + // continue the loop since prs is a copy and not effected by this initialization + continue OUTER_LOOP + } + conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer) + continue OUTER_LOOP + } + + // If height and round don't match, sleep. + if (rs.Height != prs.Height) || (rs.Round != prs.Round) { + time.Sleep(conR.conS.config.PeerGossipSleepDuration) + continue OUTER_LOOP + } + + // By here, height and round match. + // Proposal block parts were already matched and sent if any were wanted. + // (These can match on hash so the round doesn't matter) + // Now consider sending other things, like the Proposal itself. + + // Send Proposal && ProposalPOL BitArray? + if rs.Proposal != nil && !prs.Proposal { + // Proposal: share the proposal metadata with peer. + { + msg := &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()} + logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) + if peer.NewSend(p2p.Envelope{ + ChannelID: DataChannel, + Message: msg, + }) { + // NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected! + ps.SetHasProposal(rs.Proposal) + } + } + // ProposalPOL: lets peer know which POL votes we have so far. + // Peer must receive ProposalMessage first. + // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round, + // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound). + if 0 <= rs.Proposal.POLRound { + msg := &tmcons.ProposalPOL{ + Height: rs.Height, + ProposalPolRound: rs.Proposal.POLRound, + ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(), + } + logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) + peer.NewSend(p2p.Envelope{ + ChannelID: DataChannel, + Message: msg, + }) + } + continue OUTER_LOOP + } + + // Nothing to do. Sleep. + time.Sleep(conR.conS.config.PeerGossipSleepDuration) + continue OUTER_LOOP + } +} + +func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState, + prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer) { + + if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { + // Ensure that the peer's PartSetHeader is correct + blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) + if blockMeta == nil { + logger.Error("Failed to load block meta", "ourHeight", rs.Height, + "blockstoreBase", conR.conS.blockStore.Base(), "blockstoreHeight", conR.conS.blockStore.Height()) + time.Sleep(conR.conS.config.PeerGossipSleepDuration) + return + } else if !blockMeta.BlockID.PartSetHeader.Equals(prs.ProposalBlockPartSetHeader) { + logger.Info("Peer ProposalBlockPartSetHeader mismatch, sleeping", + "blockPartSetHeader", blockMeta.BlockID.PartSetHeader, "peerBlockPartSetHeader", prs.ProposalBlockPartSetHeader) + time.Sleep(conR.conS.config.PeerGossipSleepDuration) + return + } + // Load the part + part := conR.conS.blockStore.LoadBlockPart(prs.Height, index) + if part == nil { + logger.Error("Could not load part", "index", index, + "blockPartSetHeader", blockMeta.BlockID.PartSetHeader, "peerBlockPartSetHeader", prs.ProposalBlockPartSetHeader) + time.Sleep(conR.conS.config.PeerGossipSleepDuration) + return + } + // Send the part + + pp, err := part.ToProto() + if err != nil { + logger.Error("Could not convert part to proto", "index", index, "error", err) + return + } + logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index) + if peer.NewSend(p2p.Envelope{ + ChannelID: DataChannel, + Message: &tmcons.BlockPart{ + Height: prs.Height, // Not our height, so it doesn't matter. + Round: prs.Round, // Not our height, so it doesn't matter. + Part: *pp, + }, + }) { + ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + } else { + logger.Debug("Sending block part for catchup failed") + } + return + } + time.Sleep(conR.conS.config.PeerGossipSleepDuration) +} + +func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) { + logger := conR.Logger.With("peer", peer) + + // Simple hack to throttle logs upon sleep. + var sleeping = 0 + +OUTER_LOOP: + for { + // Manage disconnects from self or peer. + if !peer.IsRunning() || !conR.IsRunning() { + logger.Info("Stopping gossipVotesRoutine for peer") + return + } + rs := conR.conS.GetRoundState() + prs := ps.GetRoundState() + + switch sleeping { + case 1: // First sleep + sleeping = 2 + case 2: // No more sleep + sleeping = 0 + } + + // If height matches, then send LastCommit, Prevotes, Precommits. + if rs.Height == prs.Height { + heightLogger := logger.With("height", prs.Height) + if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) { + continue OUTER_LOOP + } + } + + // Special catchup logic. + // If peer is lagging by height 1, send LastCommit. + if prs.Height != 0 && rs.Height == prs.Height+1 { + if ps.PickSendVote(rs.LastCommit) { + logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) + continue OUTER_LOOP + } + } + + // Catchup logic + // If peer is lagging by more than 1, send Commit. + if prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= conR.conS.blockStore.Base() { + // Load the block commit for prs.Height, + // which contains precommit signatures for prs.Height. + if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil { + if ps.PickSendVote(commit) { + logger.Debug("Picked Catchup commit to send", "height", prs.Height) + continue OUTER_LOOP + } + } + } + + if sleeping == 0 { + // We sent nothing. Sleep... + sleeping = 1 + logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height, + "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, + "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits) + } else if sleeping == 2 { + // Continued sleep... + sleeping = 1 + } + + time.Sleep(conR.conS.config.PeerGossipSleepDuration) + continue OUTER_LOOP + } +} + +func (conR *Reactor) gossipVotesForHeight( + logger log.Logger, + rs *cstypes.RoundState, + prs *cstypes.PeerRoundState, + ps *PeerState, +) bool { + + // If there are lastCommits to send... + if prs.Step == cstypes.RoundStepNewHeight { + if ps.PickSendVote(rs.LastCommit) { + logger.Debug("Picked rs.LastCommit to send") + return true + } + } + // If there are POL prevotes to send... + if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 { + if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { + if ps.PickSendVote(polPrevotes) { + logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", + "round", prs.ProposalPOLRound) + return true + } + } + } + // If there are prevotes to send... + if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round { + if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) + return true + } + } + // If there are precommits to send... + if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round { + if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { + logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round) + return true + } + } + // If there are prevotes to send...Needed because of validBlock mechanism + if prs.Round != -1 && prs.Round <= rs.Round { + if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) + return true + } + } + // If there are POLPrevotes to send... + if prs.ProposalPOLRound != -1 { + if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { + if ps.PickSendVote(polPrevotes) { + logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", + "round", prs.ProposalPOLRound) + return true + } + } + } + + return false +} + +// NOTE: `queryMaj23Routine` has a simple crude design since it only comes +// into play for liveness when there's a signature DDoS attack happening. +func (conR *Reactor) queryMaj23Routine(peer p2p.Peer, ps *PeerState) { + logger := conR.Logger.With("peer", peer) + +OUTER_LOOP: + for { + // Manage disconnects from self or peer. + if !peer.IsRunning() || !conR.IsRunning() { + logger.Info("Stopping queryMaj23Routine for peer") + return + } + + // Maybe send Height/Round/Prevotes + { + rs := conR.conS.GetRoundState() + prs := ps.GetRoundState() + if rs.Height == prs.Height { + if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { + + peer.NewTrySend(p2p.Envelope{ + ChannelID: StateChannel, + Message: &tmcons.VoteSetMaj23{ + Height: prs.Height, + Round: prs.Round, + Type: tmproto.PrevoteType, + BlockID: maj23.ToProto(), + }}) + + time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) + } + } + } + + // Maybe send Height/Round/Precommits + { + rs := conR.conS.GetRoundState() + prs := ps.GetRoundState() + if rs.Height == prs.Height { + if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { + peer.NewTrySend(p2p.Envelope{ + ChannelID: StateChannel, + Message: &tmcons.VoteSetMaj23{ + Height: prs.Height, + Round: prs.Round, + Type: tmproto.PrecommitType, + BlockID: maj23.ToProto(), + }}) + time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) + } + } + } + + // Maybe send Height/Round/ProposalPOL + { + rs := conR.conS.GetRoundState() + prs := ps.GetRoundState() + if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { + if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { + peer.NewTrySend(p2p.Envelope{ + ChannelID: StateChannel, + Message: &tmcons.VoteSetMaj23{ + Height: prs.Height, + Round: prs.ProposalPOLRound, + Type: tmproto.PrevoteType, + BlockID: maj23.ToProto(), + }}) + time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) + } + } + } + + // Little point sending LastCommitRound/LastCommit, + // These are fleeting and non-blocking. + + // Maybe send Height/CatchupCommitRound/CatchupCommit. + { + prs := ps.GetRoundState() + if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() && + prs.Height >= conR.conS.blockStore.Base() { + if commit := conR.conS.LoadCommit(prs.Height); commit != nil { + peer.NewTrySend(p2p.Envelope{ + ChannelID: StateChannel, + Message: &tmcons.VoteSetMaj23{ + Height: prs.Height, + Round: commit.Round, + Type: tmproto.PrecommitType, + BlockID: commit.BlockID.ToProto(), + }}) + time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) + } + } + } + + time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration) + + continue OUTER_LOOP + } +} + +func (conR *Reactor) peerStatsRoutine() { + for { + if !conR.IsRunning() { + conR.Logger.Info("Stopping peerStatsRoutine") + return + } + + select { + case msg := <-conR.conS.statsMsgQueue: + // Get peer + peer := conR.Switch.Peers().Get(msg.PeerID) + if peer == nil { + conR.Logger.Debug("Attempt to update stats for non-existent peer", + "peer", msg.PeerID) + continue + } + // Get peer state + ps, ok := peer.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", peer)) + } + switch msg.Msg.(type) { + case *tmcon.VoteMessage: + if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { + conR.Switch.MarkPeerAsGood(peer) + } + case *tmcon.BlockPartMessage: + if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 { + conR.Switch.MarkPeerAsGood(peer) + } + } + case <-conR.conS.Quit(): + return + + case <-conR.Quit(): + return + } + } +} + +// String returns a string representation of the Reactor. +// NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables. +// TODO: improve! +func (conR *Reactor) String() string { + // better not to access shared variables + return "ConsensusReactor" // conR.StringIndented("") +} + +// StringIndented returns an indented string representation of the Reactor +func (conR *Reactor) StringIndented(indent string) string { + s := "ConsensusReactor{\n" + s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" + for _, peer := range conR.Switch.Peers().List() { + ps, ok := peer.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", peer)) + } + s += indent + " " + ps.StringIndented(indent+" ") + "\n" + } + s += indent + "}" + return s +} + +// ReactorMetrics sets the metrics +func ReactorMetrics(metrics *tmcon.Metrics) ReactorOption { + return func(conR *Reactor) { conR.Metrics = metrics } +} + +//----------------------------------------------------------------------------- + +var ( + ErrPeerStateHeightRegression = errors.New("error peer state height regression") + ErrPeerStateInvalidStartTime = errors.New("error peer state invalid startTime") +) + +// PeerState contains the known state of a peer, including its connection and +// threadsafe access to its PeerRoundState. +// NOTE: THIS GETS DUMPED WITH rpc/core/consensus.go. +// Be mindful of what you Expose. +type PeerState struct { + peer p2p.Peer + logger log.Logger + + mtx sync.Mutex // NOTE: Modify below using setters, never directly. + PRS cstypes.PeerRoundState `json:"round_state"` // Exposed. + Stats *peerStateStats `json:"stats"` // Exposed. +} + +// peerStateStats holds internal statistics for a peer. +type peerStateStats struct { + Votes int `json:"votes"` + BlockParts int `json:"block_parts"` +} + +func (pss peerStateStats) String() string { + return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", + pss.Votes, pss.BlockParts) +} + +// NewPeerState returns a new PeerState for the given Peer +func NewPeerState(peer p2p.Peer) *PeerState { + return &PeerState{ + peer: peer, + logger: log.NewNopLogger(), + PRS: cstypes.PeerRoundState{ + Round: -1, + ProposalPOLRound: -1, + LastCommitRound: -1, + CatchupCommitRound: -1, + }, + Stats: &peerStateStats{}, + } +} + +// SetLogger allows to set a logger on the peer state. Returns the peer state +// itself. +func (ps *PeerState) SetLogger(logger log.Logger) *PeerState { + ps.logger = logger + return ps +} + +// GetRoundState returns an shallow copy of the PeerRoundState. +// There's no point in mutating it since it won't change PeerState. +func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + prs := ps.PRS // copy + return &prs +} + +// ToJSON returns a json of PeerState. +func (ps *PeerState) ToJSON() ([]byte, error) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + return tmjson.Marshal(ps) +} + +// GetHeight returns an atomic snapshot of the PeerRoundState's height +// used by the mempool to ensure peers are caught up before broadcasting new txs +func (ps *PeerState) GetHeight() int64 { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return ps.PRS.Height +} + +// SetHasProposal sets the given proposal as known for the peer. +func (ps *PeerState) SetHasProposal(proposal *types.Proposal) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round { + return + } + + if ps.PRS.Proposal { + return + } + + ps.PRS.Proposal = true + + // ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage + if ps.PRS.ProposalBlockParts != nil { + return + } + + ps.PRS.ProposalBlockPartSetHeader = proposal.BlockID.PartSetHeader + ps.PRS.ProposalBlockParts = bits.NewBitArray(int(proposal.BlockID.PartSetHeader.Total)) + ps.PRS.ProposalPOLRound = proposal.POLRound + ps.PRS.ProposalPOL = nil // Nil until ProposalPOLMessage received. +} + +// InitProposalBlockParts initializes the peer's proposal block parts header and bit array. +func (ps *PeerState) InitProposalBlockParts(partSetHeader types.PartSetHeader) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.PRS.ProposalBlockParts != nil { + return + } + + ps.PRS.ProposalBlockPartSetHeader = partSetHeader + ps.PRS.ProposalBlockParts = bits.NewBitArray(int(partSetHeader.Total)) +} + +// SetHasProposalBlockPart sets the given block part index as known for the peer. +func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index int) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.PRS.Height != height || ps.PRS.Round != round { + return + } + + ps.PRS.ProposalBlockParts.SetIndex(index, true) +} + +// PickSendVote picks a vote and sends it to the peer. +// Returns true if vote was sent. +func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { + if vote, ok := ps.PickVoteToSend(votes); ok { + ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) + if ps.peer.NewTrySend(p2p.Envelope{ + ChannelID: VoteChannel, + Message: vote.ToProto(), + }) { + ps.SetHasVote(vote) + return true + } + return false + } + return false +} + +// PickVoteToSend picks a vote to send to the peer. +// Returns true if a vote was picked. +// NOTE: `votes` must be the correct Size() for the Height(). +func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if votes.Size() == 0 { + return nil, false + } + + height, round, votesType, size := + votes.GetHeight(), votes.GetRound(), tmproto.SignedMsgType(votes.Type()), votes.Size() + + // Lazily set data using 'votes'. + if votes.IsCommit() { + ps.ensureCatchupCommitRound(height, round, size) + } + ps.ensureVoteBitArrays(height, size) + + psVotes := ps.getVoteBitArray(height, round, votesType) + if psVotes == nil { + return nil, false // Not something worth sending + } + if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok { + return votes.GetByIndex(int32(index)), true + } + return nil, false +} + +func (ps *PeerState) getVoteBitArray(height int64, round int32, votesType tmproto.SignedMsgType) *bits.BitArray { + if !types.IsVoteTypeValid(votesType) { + return nil + } + + if ps.PRS.Height == height { + if ps.PRS.Round == round { + switch votesType { + case tmproto.PrevoteType: + return ps.PRS.Prevotes + case tmproto.PrecommitType: + return ps.PRS.Precommits + } + } + if ps.PRS.CatchupCommitRound == round { + switch votesType { + case tmproto.PrevoteType: + return nil + case tmproto.PrecommitType: + return ps.PRS.CatchupCommit + } + } + if ps.PRS.ProposalPOLRound == round { + switch votesType { + case tmproto.PrevoteType: + return ps.PRS.ProposalPOL + case tmproto.PrecommitType: + return nil + } + } + return nil + } + if ps.PRS.Height == height+1 { + if ps.PRS.LastCommitRound == round { + switch votesType { + case tmproto.PrevoteType: + return nil + case tmproto.PrecommitType: + return ps.PRS.LastCommit + } + } + return nil + } + return nil +} + +// 'round': A round for which we have a +2/3 commit. +func (ps *PeerState) ensureCatchupCommitRound(height int64, round int32, numValidators int) { + if ps.PRS.Height != height { + return + } + /* + NOTE: This is wrong, 'round' could change. + e.g. if orig round is not the same as block LastCommit round. + if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round { + panic(fmt.Sprintf( + "Conflicting CatchupCommitRound. Height: %v, + Orig: %v, + New: %v", + height, + ps.CatchupCommitRound, + round)) + } + */ + if ps.PRS.CatchupCommitRound == round { + return // Nothing to do! + } + ps.PRS.CatchupCommitRound = round + if round == ps.PRS.Round { + ps.PRS.CatchupCommit = ps.PRS.Precommits + } else { + ps.PRS.CatchupCommit = bits.NewBitArray(numValidators) + } +} + +// EnsureVoteBitArrays ensures the bit-arrays have been allocated for tracking +// what votes this peer has received. +// NOTE: It's important to make sure that numValidators actually matches +// what the node sees as the number of validators for height. +func (ps *PeerState) EnsureVoteBitArrays(height int64, numValidators int) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + ps.ensureVoteBitArrays(height, numValidators) +} + +func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) { + if ps.PRS.Height == height { + if ps.PRS.Prevotes == nil { + ps.PRS.Prevotes = bits.NewBitArray(numValidators) + } + if ps.PRS.Precommits == nil { + ps.PRS.Precommits = bits.NewBitArray(numValidators) + } + if ps.PRS.CatchupCommit == nil { + ps.PRS.CatchupCommit = bits.NewBitArray(numValidators) + } + if ps.PRS.ProposalPOL == nil { + ps.PRS.ProposalPOL = bits.NewBitArray(numValidators) + } + } else if ps.PRS.Height == height+1 { + if ps.PRS.LastCommit == nil { + ps.PRS.LastCommit = bits.NewBitArray(numValidators) + } + } +} + +// RecordVote increments internal votes related statistics for this peer. +// It returns the total number of added votes. +func (ps *PeerState) RecordVote() int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + ps.Stats.Votes++ + + return ps.Stats.Votes +} + +// VotesSent returns the number of blocks for which peer has been sending us +// votes. +func (ps *PeerState) VotesSent() int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + return ps.Stats.Votes +} + +// RecordBlockPart increments internal block part related statistics for this peer. +// It returns the total number of added block parts. +func (ps *PeerState) RecordBlockPart() int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + ps.Stats.BlockParts++ + return ps.Stats.BlockParts +} + +// BlockPartsSent returns the number of useful block parts the peer has sent us. +func (ps *PeerState) BlockPartsSent() int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + return ps.Stats.BlockParts +} + +// SetHasVote sets the given vote as known by the peer +func (ps *PeerState) SetHasVote(vote *types.Vote) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex) +} + +func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.SignedMsgType, index int32) { + logger := ps.logger.With( + "peerH/R", + fmt.Sprintf("%d/%d", ps.PRS.Height, ps.PRS.Round), + "H/R", + fmt.Sprintf("%d/%d", height, round)) + logger.Debug("setHasVote", "type", voteType, "index", index) + + // NOTE: some may be nil BitArrays -> no side effects. + psVotes := ps.getVoteBitArray(height, round, voteType) + if psVotes != nil { + psVotes.SetIndex(int(index), true) + } +} + +// ApplyNewRoundStepMessage updates the peer state for the new round. +func (ps *PeerState) ApplyNewRoundStepMessage(msg *tmcon.NewRoundStepMessage) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + // Ignore duplicates or decreases + if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 { + return + } + + // Just remember these values. + psHeight := ps.PRS.Height + psRound := ps.PRS.Round + psCatchupCommitRound := ps.PRS.CatchupCommitRound + psCatchupCommit := ps.PRS.CatchupCommit + + startTime := tmtime.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second) + ps.PRS.Height = msg.Height + ps.PRS.Round = msg.Round + ps.PRS.Step = msg.Step + ps.PRS.StartTime = startTime + if psHeight != msg.Height || psRound != msg.Round { + ps.PRS.Proposal = false + ps.PRS.ProposalBlockPartSetHeader = types.PartSetHeader{} + ps.PRS.ProposalBlockParts = nil + ps.PRS.ProposalPOLRound = -1 + ps.PRS.ProposalPOL = nil + // We'll update the BitArray capacity later. + ps.PRS.Prevotes = nil + ps.PRS.Precommits = nil + } + if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound { + // Peer caught up to CatchupCommitRound. + // Preserve psCatchupCommit! + // NOTE: We prefer to use prs.Precommits if + // pr.Round matches pr.CatchupCommitRound. + ps.PRS.Precommits = psCatchupCommit + } + if psHeight != msg.Height { + // Shift Precommits to LastCommit. + if psHeight+1 == msg.Height && psRound == msg.LastCommitRound { + ps.PRS.LastCommitRound = msg.LastCommitRound + ps.PRS.LastCommit = ps.PRS.Precommits + } else { + ps.PRS.LastCommitRound = msg.LastCommitRound + ps.PRS.LastCommit = nil + } + // We'll update the BitArray capacity later. + ps.PRS.CatchupCommitRound = -1 + ps.PRS.CatchupCommit = nil + } +} + +// ApplyNewValidBlockMessage updates the peer state for the new valid block. +func (ps *PeerState) ApplyNewValidBlockMessage(msg *tmcon.NewValidBlockMessage) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.PRS.Height != msg.Height { + return + } + + if ps.PRS.Round != msg.Round && !msg.IsCommit { + return + } + + ps.PRS.ProposalBlockPartSetHeader = msg.BlockPartSetHeader + ps.PRS.ProposalBlockParts = msg.BlockParts +} + +// ApplyProposalPOLMessage updates the peer state for the new proposal POL. +func (ps *PeerState) ApplyProposalPOLMessage(msg *tmcon.ProposalPOLMessage) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.PRS.Height != msg.Height { + return + } + if ps.PRS.ProposalPOLRound != msg.ProposalPOLRound { + return + } + + // TODO: Merge onto existing ps.PRS.ProposalPOL? + // We might have sent some prevotes in the meantime. + ps.PRS.ProposalPOL = msg.ProposalPOL +} + +// ApplyHasVoteMessage updates the peer state for the new vote. +func (ps *PeerState) ApplyHasVoteMessage(msg *tmcon.HasVoteMessage) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.PRS.Height != msg.Height { + return + } + + ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) +} + +// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes +// it claims to have for the corresponding BlockID. +// `ourVotes` is a BitArray of votes we have for msg.BlockID +// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height), +// we conservatively overwrite ps's votes w/ msg.Votes. +func (ps *PeerState) ApplyVoteSetBitsMessage(msg *tmcon.VoteSetBitsMessage, ourVotes *bits.BitArray) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type) + if votes != nil { + if ourVotes == nil { + votes.Update(msg.Votes) + } else { + otherVotes := votes.Sub(ourVotes) + hasVotes := otherVotes.Or(msg.Votes) + votes.Update(hasVotes) + } + } +} + +// String returns a string representation of the PeerState +func (ps *PeerState) String() string { + return ps.StringIndented("") +} + +// StringIndented returns a string representation of the PeerState +func (ps *PeerState) StringIndented(indent string) string { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return fmt.Sprintf(`PeerState{ +%s Key %v +%s RoundState %v +%s Stats %v +%s}`, + indent, ps.peer.ID(), + indent, ps.PRS.StringIndented(indent+" "), + indent, ps.Stats, + indent) +} + +//----------------------------------------------------------------------------- + +// func init() { +// tmjson.RegisterType(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage") +// tmjson.RegisterType(&NewValidBlockMessage{}, "tendermint/NewValidBlockMessage") +// tmjson.RegisterType(&ProposalMessage{}, "tendermint/Proposal") +// tmjson.RegisterType(&ProposalPOLMessage{}, "tendermint/ProposalPOL") +// tmjson.RegisterType(&BlockPartMessage{}, "tendermint/BlockPart") +// tmjson.RegisterType(&VoteMessage{}, "tendermint/Vote") +// tmjson.RegisterType(&HasVoteMessage{}, "tendermint/HasVote") +// tmjson.RegisterType(&VoteSetMaj23Message{}, "tendermint/VoteSetMaj23") +// tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits") +// } + +func decodeMsg(bz []byte) (msg tmcon.Message, err error) { + pb := &tmcons.Message{} + if err = proto.Unmarshal(bz, pb); err != nil { + return msg, err + } + + return tmcon.MsgFromProto(pb) +}