eventbus: publish without contexts (#8369)

This commit is contained in:
Sam Kleinman
2022-04-18 16:28:31 -04:00
committed by GitHub
parent 889341152a
commit c372390fea
29 changed files with 256 additions and 280 deletions

View File

@@ -359,7 +359,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
go r.requestRoutine(ctx, bsCh)
go r.poolRoutine(ctx, true, bsCh)
if err := r.PublishStatus(ctx, types.EventDataBlockSyncStatus{
if err := r.PublishStatus(types.EventDataBlockSyncStatus{
Complete: false,
Height: state.LastBlockHeight,
}); err != nil {
@@ -609,11 +609,11 @@ func (r *Reactor) GetRemainingSyncTime() time.Duration {
return time.Duration(int64(remain * float64(time.Second)))
}
func (r *Reactor) PublishStatus(ctx context.Context, event types.EventDataBlockSyncStatus) error {
func (r *Reactor) PublishStatus(event types.EventDataBlockSyncStatus) error {
if r.eventBus == nil {
return errors.New("event bus is not configured")
}
return r.eventBus.PublishEventBlockSyncStatus(ctx, event)
return r.eventBus.PublishEventBlockSyncStatus(event)
}
// atomicBool is an atomic Boolean, safe for concurrent use by multiple

View File

@@ -96,7 +96,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus, sm.NopMetrics())
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
cs, err := NewState(logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
require.NoError(t, err)
// set private validator
pv := privVals[i]

View File

@@ -491,8 +491,7 @@ func newStateWithConfigAndBlockStore(
require.NoError(t, eventBus.Start(ctx))
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore, eventBus, sm.NopMetrics())
cs, err := NewState(ctx,
logger.With("module", "consensus"),
cs, err := NewState(logger.With("module", "consensus"),
thisConfig.Consensus,
stateStore,
blockExec,

View File

@@ -210,7 +210,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
// leak the goroutine when stopping the reactor.
go r.peerStatsRoutine(ctx, peerUpdates)
r.subscribeToBroadcastEvents(chBundle.state)
r.subscribeToBroadcastEvents(ctx, chBundle.state)
if !r.WaitSync() {
if err := r.state.Start(ctx); err != nil {
@@ -260,7 +260,7 @@ func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL
// NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a
// NewRoundStepMessage.
r.state.updateToState(ctx, state)
r.state.updateToState(state)
if err := r.state.Start(ctx); err != nil {
panic(fmt.Sprintf(`failed to start consensus state: %v
@@ -284,7 +284,7 @@ conR:
}
d := types.EventDataBlockSyncStatus{Complete: true, Height: state.LastBlockHeight}
if err := r.eventBus.PublishEventBlockSyncStatus(ctx, d); err != nil {
if err := r.eventBus.PublishEventBlockSyncStatus(d); err != nil {
r.logger.Error("failed to emit the blocksync complete event", "err", err)
}
}
@@ -344,13 +344,13 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote,
// subscribeToBroadcastEvents subscribes for new round steps and votes using the
// internal pubsub defined in the consensus state to broadcast them to peers
// upon receiving.
func (r *Reactor) subscribeToBroadcastEvents(stateCh *p2p.Channel) {
func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh *p2p.Channel) {
onStopCh := r.state.getOnStopCh()
err := r.state.evsw.AddListenerForEvent(
listenerIDConsensus,
types.EventNewRoundStepValue,
func(ctx context.Context, data tmevents.EventData) error {
func(data tmevents.EventData) error {
if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState), stateCh); err != nil {
return err
}
@@ -371,7 +371,7 @@ func (r *Reactor) subscribeToBroadcastEvents(stateCh *p2p.Channel) {
err = r.state.evsw.AddListenerForEvent(
listenerIDConsensus,
types.EventValidBlockValue,
func(ctx context.Context, data tmevents.EventData) error {
func(data tmevents.EventData) error {
return r.broadcastNewValidBlockMessage(ctx, data.(*cstypes.RoundState), stateCh)
},
)
@@ -382,7 +382,7 @@ func (r *Reactor) subscribeToBroadcastEvents(stateCh *p2p.Channel) {
err = r.state.evsw.AddListenerForEvent(
listenerIDConsensus,
types.EventVoteValue,
func(ctx context.Context, data tmevents.EventData) error {
func(data tmevents.EventData) error {
return r.broadcastHasVoteMessage(ctx, data.(*types.Vote), stateCh)
},
)

View File

@@ -505,7 +505,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus, sm.NopMetrics())
cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"),
cs, err := NewState(logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2, eventBus)
require.NoError(t, err)
cs.SetPrivValidator(ctx, pv)

View File

@@ -145,7 +145,7 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event
pb.cs.Stop()
pb.cs.Wait()
newCS, err := NewState(ctx, pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec,
newCS, err := NewState(pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, pb.cs.eventBus)
if err != nil {
return err
@@ -350,7 +350,7 @@ func newConsensusStateForReplay(
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore, eventBus, sm.NopMetrics())
consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec,
consensusState, err := NewState(logger, csConfig, stateStore, blockExec,
blockStore, mempool, evpool, eventBus)
if err != nil {
return nil, err

View File

@@ -194,7 +194,6 @@ func SkipStateStoreBootstrap(sm *State) {
// NewState returns a new State.
func NewState(
ctx context.Context,
logger log.Logger,
cfg *config.ConsensusConfig,
store sm.Store,
@@ -240,7 +239,7 @@ func NewState(
// node-fragments gracefully while letting the nodes
// themselves avoid this.
if !cs.skipBootstrapping {
if err := cs.updateStateFromStore(ctx); err != nil {
if err := cs.updateStateFromStore(); err != nil {
return nil, err
}
}
@@ -248,7 +247,7 @@ func NewState(
return cs, nil
}
func (cs *State) updateStateFromStore(ctx context.Context) error {
func (cs *State) updateStateFromStore() error {
if cs.initialStatePopulated {
return nil
}
@@ -265,7 +264,7 @@ func (cs *State) updateStateFromStore(ctx context.Context) error {
cs.reconstructLastCommit(state)
}
cs.updateToState(ctx, state)
cs.updateToState(state)
cs.initialStatePopulated = true
return nil
@@ -393,7 +392,7 @@ func (cs *State) LoadCommit(height int64) *types.Commit {
// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
func (cs *State) OnStart(ctx context.Context) error {
if err := cs.updateStateFromStore(ctx); err != nil {
if err := cs.updateStateFromStore(); err != nil {
return err
}
@@ -718,7 +717,7 @@ func (cs *State) reconstructLastCommit(state sm.State) {
// Updates State and increments height to match that of state.
// The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight.
func (cs *State) updateToState(ctx context.Context, state sm.State) {
func (cs *State) updateToState(state sm.State) {
if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight {
panic(fmt.Sprintf(
"updateToState() expected state height of %v but found %v",
@@ -753,7 +752,7 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) {
"new_height", state.LastBlockHeight+1,
"old_height", cs.state.LastBlockHeight+1,
)
cs.newStep(ctx)
cs.newStep()
return
}
}
@@ -823,10 +822,10 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) {
cs.state = state
// Finally, broadcast RoundState
cs.newStep(ctx)
cs.newStep()
}
func (cs *State) newStep(ctx context.Context) {
func (cs *State) newStep() {
rs := cs.RoundStateEvent()
if err := cs.wal.Write(rs); err != nil {
cs.logger.Error("failed writing to WAL", "err", err)
@@ -836,11 +835,11 @@ func (cs *State) newStep(ctx context.Context) {
// newStep is called by updateToState in NewState before the eventBus is set!
if cs.eventBus != nil {
if err := cs.eventBus.PublishEventNewRoundStep(ctx, rs); err != nil {
if err := cs.eventBus.PublishEventNewRoundStep(rs); err != nil {
cs.logger.Error("failed publishing new round step", "err", err)
}
cs.evsw.FireEvent(ctx, types.EventNewRoundStepValue, &cs.RoundState)
cs.evsw.FireEvent(types.EventNewRoundStepValue, &cs.RoundState)
}
}
@@ -977,7 +976,7 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) {
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
added, err = cs.addProposalBlockPart(ctx, msg, peerID)
added, err = cs.addProposalBlockPart(msg, peerID)
// We unlock here to yield to any routines that need to read the the RoundState.
// Previously, this code held the lock from the point at which the final block
@@ -1083,21 +1082,21 @@ func (cs *State) handleTimeout(
cs.enterPropose(ctx, ti.Height, 0)
case cstypes.RoundStepPropose:
if err := cs.eventBus.PublishEventTimeoutPropose(ctx, cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventTimeoutPropose(cs.RoundStateEvent()); err != nil {
cs.logger.Error("failed publishing timeout propose", "err", err)
}
cs.enterPrevote(ctx, ti.Height, ti.Round)
case cstypes.RoundStepPrevoteWait:
if err := cs.eventBus.PublishEventTimeoutWait(ctx, cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()); err != nil {
cs.logger.Error("failed publishing timeout wait", "err", err)
}
cs.enterPrecommit(ctx, ti.Height, ti.Round)
case cstypes.RoundStepPrecommitWait:
if err := cs.eventBus.PublishEventTimeoutWait(ctx, cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()); err != nil {
cs.logger.Error("failed publishing timeout wait", "err", err)
}
@@ -1200,7 +1199,7 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) {
cs.Votes.SetRound(r) // also track next round (round+1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false
if err := cs.eventBus.PublishEventNewRound(ctx, cs.NewRoundEvent()); err != nil {
if err := cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err != nil {
cs.logger.Error("failed publishing new round", "err", err)
}
// Wait for txs to be available in the mempool
@@ -1263,7 +1262,7 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32) {
defer func() {
// Done enterPropose:
cs.updateRoundStep(round, cstypes.RoundStepPropose)
cs.newStep(ctx)
cs.newStep()
// If we have the whole proposal + POL, then goto Prevote now.
// else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart),
@@ -1455,7 +1454,7 @@ func (cs *State) enterPrevote(ctx context.Context, height int64, round int32) {
defer func() {
// Done enterPrevote:
cs.updateRoundStep(round, cstypes.RoundStepPrevote)
cs.newStep(ctx)
cs.newStep()
}()
logger.Debug("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
@@ -1606,7 +1605,7 @@ func (cs *State) defaultDoPrevote(ctx context.Context, height int64, round int32
}
// Enter: any +2/3 prevotes at next round.
func (cs *State) enterPrevoteWait(ctx context.Context, height int64, round int32) {
func (cs *State) enterPrevoteWait(height int64, round int32) {
logger := cs.logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrevoteWait <= cs.Step) {
@@ -1629,7 +1628,7 @@ func (cs *State) enterPrevoteWait(ctx context.Context, height int64, round int32
defer func() {
// Done enterPrevoteWait:
cs.updateRoundStep(round, cstypes.RoundStepPrevoteWait)
cs.newStep(ctx)
cs.newStep()
}()
// Wait for some more prevotes; enterPrecommit
@@ -1657,7 +1656,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32)
defer func() {
// Done enterPrecommit:
cs.updateRoundStep(round, cstypes.RoundStepPrecommit)
cs.newStep(ctx)
cs.newStep()
}()
// check for a polka
@@ -1676,7 +1675,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32)
}
// At this point +2/3 prevoted for a particular block or nil.
if err := cs.eventBus.PublishEventPolka(ctx, cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventPolka(cs.RoundStateEvent()); err != nil {
logger.Error("failed publishing polka", "err", err)
}
@@ -1713,7 +1712,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32)
logger.Debug("precommit step: +2/3 prevoted locked block; relocking")
cs.LockedRound = round
if err := cs.eventBus.PublishEventRelock(ctx, cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventRelock(cs.RoundStateEvent()); err != nil {
logger.Error("precommit step: failed publishing event relock", "err", err)
}
@@ -1736,7 +1735,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32)
cs.LockedBlock = cs.ProposalBlock
cs.LockedBlockParts = cs.ProposalBlockParts
if err := cs.eventBus.PublishEventLock(ctx, cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventLock(cs.RoundStateEvent()); err != nil {
logger.Error("precommit step: failed publishing event lock", "err", err)
}
@@ -1758,7 +1757,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32)
}
// Enter: any +2/3 precommits for next round.
func (cs *State) enterPrecommitWait(ctx context.Context, height int64, round int32) {
func (cs *State) enterPrecommitWait(height int64, round int32) {
logger := cs.logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.TriggeredTimeoutPrecommit) {
@@ -1782,7 +1781,7 @@ func (cs *State) enterPrecommitWait(ctx context.Context, height int64, round int
defer func() {
// Done enterPrecommitWait:
cs.TriggeredTimeoutPrecommit = true
cs.newStep(ctx)
cs.newStep()
}()
// wait for some more precommits; enterNewRound
@@ -1809,7 +1808,7 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3
cs.updateRoundStep(cs.Round, cstypes.RoundStepCommit)
cs.CommitRound = commitRound
cs.CommitTime = tmtime.Now()
cs.newStep(ctx)
cs.newStep()
// Maybe finalize immediately.
cs.tryFinalizeCommit(ctx, height)
@@ -1844,11 +1843,11 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3
cs.metrics.MarkBlockGossipStarted()
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
if err := cs.eventBus.PublishEventValidBlock(ctx, cs.RoundStateEvent()); err != nil {
if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil {
logger.Error("failed publishing valid block", "err", err)
}
cs.evsw.FireEvent(ctx, types.EventValidBlockValue, &cs.RoundState)
cs.evsw.FireEvent(types.EventValidBlockValue, &cs.RoundState)
}
}
}
@@ -1975,7 +1974,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) {
cs.RecordMetrics(height, block)
// NewHeightStep!
cs.updateToState(ctx, stateCopy)
cs.updateToState(stateCopy)
// Private validator might have changed it's key pair => refetch pubkey.
if err := cs.updatePrivValidatorPubKey(ctx); err != nil {
@@ -2130,7 +2129,6 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit,
// once we have the full block.
func (cs *State) addProposalBlockPart(
ctx context.Context,
msg *BlockPartMessage,
peerID types.NodeID,
) (added bool, err error) {
@@ -2196,7 +2194,7 @@ func (cs *State) addProposalBlockPart(
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.logger.Info("received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil {
if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil {
cs.logger.Error("failed publishing event complete proposal", "err", err)
}
}
@@ -2315,11 +2313,11 @@ func (cs *State) addVote(
}
cs.logger.Debug("added vote to last precommits", "last_commit", cs.LastCommit.StringShort())
if err := cs.eventBus.PublishEventVote(ctx, types.EventDataVote{Vote: vote}); err != nil {
if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil {
return added, err
}
cs.evsw.FireEvent(ctx, types.EventVoteValue, vote)
cs.evsw.FireEvent(types.EventVoteValue, vote)
// if we can skip timeoutCommit and have all the votes now,
if cs.bypassCommitTimeout() && cs.LastCommit.HasAll() {
@@ -2352,10 +2350,10 @@ func (cs *State) addVote(
return
}
if err := cs.eventBus.PublishEventVote(ctx, types.EventDataVote{Vote: vote}); err != nil {
if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil {
return added, err
}
cs.evsw.FireEvent(ctx, types.EventVoteValue, vote)
cs.evsw.FireEvent(types.EventVoteValue, vote)
switch vote.Type {
case tmproto.PrevoteType:
@@ -2390,8 +2388,8 @@ func (cs *State) addVote(
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
}
cs.evsw.FireEvent(ctx, types.EventValidBlockValue, &cs.RoundState)
if err := cs.eventBus.PublishEventValidBlock(ctx, cs.RoundStateEvent()); err != nil {
cs.evsw.FireEvent(types.EventValidBlockValue, &cs.RoundState)
if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil {
return added, err
}
}
@@ -2408,7 +2406,7 @@ func (cs *State) addVote(
if ok && (cs.isProposalComplete() || blockID.IsNil()) {
cs.enterPrecommit(ctx, height, vote.Round)
} else if prevotes.HasTwoThirdsAny() {
cs.enterPrevoteWait(ctx, height, vote.Round)
cs.enterPrevoteWait(height, vote.Round)
}
case cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round:
@@ -2439,11 +2437,11 @@ func (cs *State) addVote(
cs.enterNewRound(ctx, cs.Height, 0)
}
} else {
cs.enterPrecommitWait(ctx, height, vote.Round)
cs.enterPrecommitWait(height, vote.Round)
}
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() {
cs.enterNewRound(ctx, height, vote.Round)
cs.enterPrecommitWait(ctx, height, vote.Round)
cs.enterPrecommitWait(height, vote.Round)
}
default:

View File

@@ -81,7 +81,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), proxyApp, mempool, evpool, blockStore, eventBus, sm.NopMetrics())
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
consensusState, err := NewState(logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
if err != nil {
t.Fatal(err)
}

View File

@@ -66,7 +66,7 @@ func (b *EventBus) Observe(ctx context.Context, observe func(tmpubsub.Message) e
return b.pubsub.Observe(ctx, observe, queries...)
}
func (b *EventBus) Publish(ctx context.Context, eventValue string, eventData types.EventData) error {
func (b *EventBus) Publish(eventValue string, eventData types.EventData) error {
tokens := strings.Split(types.EventTypeKey, ".")
event := abci.Event{
Type: tokens[0],
@@ -78,19 +78,19 @@ func (b *EventBus) Publish(ctx context.Context, eventValue string, eventData typ
},
}
return b.pubsub.PublishWithEvents(ctx, eventData, []abci.Event{event})
return b.pubsub.PublishWithEvents(eventData, []abci.Event{event})
}
func (b *EventBus) PublishEventNewBlock(ctx context.Context, data types.EventDataNewBlock) error {
func (b *EventBus) PublishEventNewBlock(data types.EventDataNewBlock) error {
events := data.ResultFinalizeBlock.Events
// add Tendermint-reserved new block event
events = append(events, types.EventNewBlock)
return b.pubsub.PublishWithEvents(ctx, data, events)
return b.pubsub.PublishWithEvents(data, events)
}
func (b *EventBus) PublishEventNewBlockHeader(ctx context.Context, data types.EventDataNewBlockHeader) error {
func (b *EventBus) PublishEventNewBlockHeader(data types.EventDataNewBlockHeader) error {
// no explicit deadline for publishing events
events := data.ResultFinalizeBlock.Events
@@ -98,33 +98,33 @@ func (b *EventBus) PublishEventNewBlockHeader(ctx context.Context, data types.Ev
// add Tendermint-reserved new block header event
events = append(events, types.EventNewBlockHeader)
return b.pubsub.PublishWithEvents(ctx, data, events)
return b.pubsub.PublishWithEvents(data, events)
}
func (b *EventBus) PublishEventNewEvidence(ctx context.Context, evidence types.EventDataNewEvidence) error {
return b.Publish(ctx, types.EventNewEvidenceValue, evidence)
func (b *EventBus) PublishEventNewEvidence(evidence types.EventDataNewEvidence) error {
return b.Publish(types.EventNewEvidenceValue, evidence)
}
func (b *EventBus) PublishEventVote(ctx context.Context, data types.EventDataVote) error {
return b.Publish(ctx, types.EventVoteValue, data)
func (b *EventBus) PublishEventVote(data types.EventDataVote) error {
return b.Publish(types.EventVoteValue, data)
}
func (b *EventBus) PublishEventValidBlock(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventValidBlockValue, data)
func (b *EventBus) PublishEventValidBlock(data types.EventDataRoundState) error {
return b.Publish(types.EventValidBlockValue, data)
}
func (b *EventBus) PublishEventBlockSyncStatus(ctx context.Context, data types.EventDataBlockSyncStatus) error {
return b.Publish(ctx, types.EventBlockSyncStatusValue, data)
func (b *EventBus) PublishEventBlockSyncStatus(data types.EventDataBlockSyncStatus) error {
return b.Publish(types.EventBlockSyncStatusValue, data)
}
func (b *EventBus) PublishEventStateSyncStatus(ctx context.Context, data types.EventDataStateSyncStatus) error {
return b.Publish(ctx, types.EventStateSyncStatusValue, data)
func (b *EventBus) PublishEventStateSyncStatus(data types.EventDataStateSyncStatus) error {
return b.Publish(types.EventStateSyncStatusValue, data)
}
// PublishEventTx publishes tx event with events from Result. Note it will add
// predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys
// will be overwritten.
func (b *EventBus) PublishEventTx(ctx context.Context, data types.EventDataTx) error {
func (b *EventBus) PublishEventTx(data types.EventDataTx) error {
events := data.Result.Events
// add Tendermint-reserved events
@@ -152,45 +152,45 @@ func (b *EventBus) PublishEventTx(ctx context.Context, data types.EventDataTx) e
},
})
return b.pubsub.PublishWithEvents(ctx, data, events)
return b.pubsub.PublishWithEvents(data, events)
}
func (b *EventBus) PublishEventNewRoundStep(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventNewRoundStepValue, data)
func (b *EventBus) PublishEventNewRoundStep(data types.EventDataRoundState) error {
return b.Publish(types.EventNewRoundStepValue, data)
}
func (b *EventBus) PublishEventTimeoutPropose(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventTimeoutProposeValue, data)
func (b *EventBus) PublishEventTimeoutPropose(data types.EventDataRoundState) error {
return b.Publish(types.EventTimeoutProposeValue, data)
}
func (b *EventBus) PublishEventTimeoutWait(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventTimeoutWaitValue, data)
func (b *EventBus) PublishEventTimeoutWait(data types.EventDataRoundState) error {
return b.Publish(types.EventTimeoutWaitValue, data)
}
func (b *EventBus) PublishEventNewRound(ctx context.Context, data types.EventDataNewRound) error {
return b.Publish(ctx, types.EventNewRoundValue, data)
func (b *EventBus) PublishEventNewRound(data types.EventDataNewRound) error {
return b.Publish(types.EventNewRoundValue, data)
}
func (b *EventBus) PublishEventCompleteProposal(ctx context.Context, data types.EventDataCompleteProposal) error {
return b.Publish(ctx, types.EventCompleteProposalValue, data)
func (b *EventBus) PublishEventCompleteProposal(data types.EventDataCompleteProposal) error {
return b.Publish(types.EventCompleteProposalValue, data)
}
func (b *EventBus) PublishEventPolka(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventPolkaValue, data)
func (b *EventBus) PublishEventPolka(data types.EventDataRoundState) error {
return b.Publish(types.EventPolkaValue, data)
}
func (b *EventBus) PublishEventRelock(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventRelockValue, data)
func (b *EventBus) PublishEventRelock(data types.EventDataRoundState) error {
return b.Publish(types.EventRelockValue, data)
}
func (b *EventBus) PublishEventLock(ctx context.Context, data types.EventDataRoundState) error {
return b.Publish(ctx, types.EventLockValue, data)
func (b *EventBus) PublishEventLock(data types.EventDataRoundState) error {
return b.Publish(types.EventLockValue, data)
}
func (b *EventBus) PublishEventValidatorSetUpdates(ctx context.Context, data types.EventDataValidatorSetUpdates) error {
return b.Publish(ctx, types.EventValidatorSetUpdatesValue, data)
func (b *EventBus) PublishEventValidatorSetUpdates(data types.EventDataValidatorSetUpdates) error {
return b.Publish(types.EventValidatorSetUpdatesValue, data)
}
func (b *EventBus) PublishEventEvidenceValidated(ctx context.Context, evidence types.EventDataEvidenceValidated) error {
return b.Publish(ctx, types.EventEvidenceValidatedValue, evidence)
func (b *EventBus) PublishEventEvidenceValidated(evidence types.EventDataEvidenceValidated) error {
return b.Publish(types.EventEvidenceValidatedValue, evidence)
}

View File

@@ -55,7 +55,7 @@ func TestEventBusPublishEventTx(t *testing.T) {
assert.Equal(t, result, edt.Result)
}()
err = eventBus.PublishEventTx(ctx, types.EventDataTx{
err = eventBus.PublishEventTx(types.EventDataTx{
TxResult: abci.TxResult{
Height: 1,
Index: 0,
@@ -112,7 +112,7 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
assert.Equal(t, resultFinalizeBlock, edt.ResultFinalizeBlock)
}()
err = eventBus.PublishEventNewBlock(ctx, types.EventDataNewBlock{
err = eventBus.PublishEventNewBlock(types.EventDataNewBlock{
Block: block,
BlockID: blockID,
ResultFinalizeBlock: resultFinalizeBlock,
@@ -223,7 +223,7 @@ func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) {
}
}()
assert.NoError(t, eventBus.PublishEventTx(ctx, types.EventDataTx{
assert.NoError(t, eventBus.PublishEventTx(types.EventDataTx{
TxResult: abci.TxResult{
Height: 1,
Index: 0,
@@ -280,7 +280,7 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
assert.Equal(t, resultFinalizeBlock, edt.ResultFinalizeBlock)
}()
err = eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: block.Header,
ResultFinalizeBlock: resultFinalizeBlock,
})
@@ -322,7 +322,7 @@ func TestEventBusPublishEventEvidenceValidated(t *testing.T) {
assert.Equal(t, int64(1), edt.Height)
}()
err = eventBus.PublishEventEvidenceValidated(ctx, types.EventDataEvidenceValidated{
err = eventBus.PublishEventEvidenceValidated(types.EventDataEvidenceValidated{
Evidence: ev,
Height: int64(1),
})
@@ -364,7 +364,7 @@ func TestEventBusPublishEventNewEvidence(t *testing.T) {
assert.Equal(t, int64(4), edt.Height)
}()
err = eventBus.PublishEventNewEvidence(ctx, types.EventDataNewEvidence{
err = eventBus.PublishEventNewEvidence(types.EventDataNewEvidence{
Evidence: ev,
Height: 4,
})
@@ -408,22 +408,22 @@ func TestEventBusPublish(t *testing.T) {
}
}()
require.NoError(t, eventBus.Publish(ctx, types.EventNewBlockHeaderValue,
require.NoError(t, eventBus.Publish(types.EventNewBlockHeaderValue,
types.EventDataNewBlockHeader{}))
require.NoError(t, eventBus.PublishEventNewBlock(ctx, types.EventDataNewBlock{}))
require.NoError(t, eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{}))
require.NoError(t, eventBus.PublishEventVote(ctx, types.EventDataVote{}))
require.NoError(t, eventBus.PublishEventNewRoundStep(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventTimeoutPropose(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventTimeoutWait(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventNewRound(ctx, types.EventDataNewRound{}))
require.NoError(t, eventBus.PublishEventCompleteProposal(ctx, types.EventDataCompleteProposal{}))
require.NoError(t, eventBus.PublishEventPolka(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventRelock(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventLock(ctx, types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventValidatorSetUpdates(ctx, types.EventDataValidatorSetUpdates{}))
require.NoError(t, eventBus.PublishEventBlockSyncStatus(ctx, types.EventDataBlockSyncStatus{}))
require.NoError(t, eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{}))
require.NoError(t, eventBus.PublishEventNewBlock(types.EventDataNewBlock{}))
require.NoError(t, eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{}))
require.NoError(t, eventBus.PublishEventVote(types.EventDataVote{}))
require.NoError(t, eventBus.PublishEventNewRoundStep(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventTimeoutPropose(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventTimeoutWait(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventNewRound(types.EventDataNewRound{}))
require.NoError(t, eventBus.PublishEventCompleteProposal(types.EventDataCompleteProposal{}))
require.NoError(t, eventBus.PublishEventPolka(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventRelock(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventLock(types.EventDataRoundState{}))
require.NoError(t, eventBus.PublishEventValidatorSetUpdates(types.EventDataValidatorSetUpdates{}))
require.NoError(t, eventBus.PublishEventBlockSyncStatus(types.EventDataBlockSyncStatus{}))
require.NoError(t, eventBus.PublishEventStateSyncStatus(types.EventDataStateSyncStatus{}))
require.GreaterOrEqual(t, <-count, numEventsExpected)
}
@@ -505,7 +505,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes
eventValue = randEventValue()
}
err := eventBus.Publish(ctx, eventValue, types.EventDataString("Gamora"))
err := eventBus.Publish(eventValue, types.EventDataString("Gamora"))
if err != nil {
b.Error(err)
}

View File

@@ -338,7 +338,7 @@ func (evpool *Pool) addPendingEvidence(ctx context.Context, ev types.Evidence) e
return nil
}
return evpool.eventBus.PublishEventEvidenceValidated(ctx, types.EventDataEvidenceValidated{
return evpool.eventBus.PublishEventEvidenceValidated(types.EventDataEvidenceValidated{
Evidence: ev,
Height: ev.Height(),
})

View File

@@ -25,7 +25,7 @@ func BenchmarkTxMempool_CheckTx(b *testing.B) {
// setup the cache and the mempool number for hitting GetEvictableTxs during the
// benchmark. 5000 is the current default mempool size in the TM config.
txmp := setup(ctx, b, client, 10000)
txmp := setup(b, client, 10000)
txmp.config.Size = 5000
rng := rand.New(rand.NewSource(time.Now().UnixNano()))

View File

@@ -72,7 +72,7 @@ func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
}
}
func setup(ctx context.Context, t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool {
func setup(t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool {
t.Helper()
logger := log.NewNopLogger()
@@ -131,7 +131,7 @@ func TestTxMempool_TxsAvailable(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txmp := setup(t, client, 0)
txmp.EnableTxsAvailable()
ensureNoTxFire := func() {
@@ -194,7 +194,7 @@ func TestTxMempool_Size(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txmp := setup(t, client, 0)
txs := checkTxs(ctx, t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@@ -227,7 +227,7 @@ func TestTxMempool_Flush(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txmp := setup(t, client, 0)
txs := checkTxs(ctx, t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@@ -261,7 +261,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txmp := setup(t, client, 0)
tTxs := checkTxs(ctx, t, txmp, 100, 0) // all txs request 1 gas unit
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@@ -320,7 +320,7 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txmp := setup(t, client, 0)
tTxs := checkTxs(ctx, t, txmp, 100, 0)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@@ -377,7 +377,7 @@ func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txmp := setup(t, client, 0)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes+1)
@@ -403,7 +403,7 @@ func TestTxMempool_CheckTxSamePeer(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 100)
txmp := setup(t, client, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
@@ -427,7 +427,7 @@ func TestTxMempool_CheckTxSameSender(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 100)
txmp := setup(t, client, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
@@ -458,7 +458,7 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 100)
txmp := setup(t, client, 100)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
checkTxDone := make(chan struct{})
@@ -531,7 +531,7 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 500)
txmp := setup(t, client, 500)
txmp.height = 100
txmp.config.TTLNumBlocks = 10
@@ -612,7 +612,7 @@ func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error {
return testCase.err
}
txmp := setup(ctx, t, client, 0, WithPostCheck(postCheckFn))
txmp := setup(t, client, 0, WithPostCheck(postCheckFn))
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes-1)
_, err := rng.Read(tx)

View File

@@ -68,7 +68,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
require.NoError(t, client.Start(ctx))
t.Cleanup(client.Wait)
mempool := setup(ctx, t, client, 0)
mempool := setup(t, client, 0)
rts.mempools[nodeID] = mempool
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)

View File

@@ -29,6 +29,6 @@ func TestExample(t *testing.T) {
Attributes: []abci.EventAttribute{{Key: "name", Value: "John"}},
},
}
require.NoError(t, s.PublishWithEvents(ctx, pubstring("Tombstone"), events))
require.NoError(t, s.PublishWithEvents(pubstring("Tombstone"), events))
sub.mustReceive(ctx, pubstring("Tombstone"))
}

View File

@@ -287,16 +287,16 @@ func (s *Server) NumClientSubscriptions(clientID string) int {
}
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg types.EventData) error {
return s.publish(ctx, msg, []abci.Event{})
// if the pubsub server has shut down.
func (s *Server) Publish(msg types.EventData) error {
return s.publish(msg, []abci.Event{})
}
// PublishWithEvents publishes the given message with the set of events. The set
// is matched with clients queries. If there is a match, the message is sent to
// the client.
func (s *Server) PublishWithEvents(ctx context.Context, msg types.EventData, events []abci.Event) error {
return s.publish(ctx, msg, events)
func (s *Server) PublishWithEvents(msg types.EventData, events []abci.Event) error {
return s.publish(msg, events)
}
// OnStop implements part of the Service interface. It is a no-op.
@@ -309,15 +309,13 @@ func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() }
// OnStart implements Service.OnStart by starting the server.
func (s *Server) OnStart(ctx context.Context) error { s.run(ctx); return nil }
func (s *Server) publish(ctx context.Context, data types.EventData, events []abci.Event) error {
func (s *Server) publish(data types.EventData, events []abci.Event) error {
s.pubs.RLock()
defer s.pubs.RUnlock()
select {
case <-s.done:
return ErrServerStopped
case <-ctx.Done():
return ctx.Err()
case s.queue <- item{
Data: data,
Events: events,

View File

@@ -42,7 +42,7 @@ func TestSubscribeWithArgs(t *testing.T) {
require.Equal(t, 1, s.NumClients())
require.Equal(t, 1, s.NumClientSubscriptions(clientID))
require.NoError(t, s.Publish(ctx, pubstring("Ka-Zar")))
require.NoError(t, s.Publish(pubstring("Ka-Zar")))
sub.mustReceive(ctx, pubstring("Ka-Zar"))
})
t.Run("PositiveLimit", func(t *testing.T) {
@@ -51,7 +51,7 @@ func TestSubscribeWithArgs(t *testing.T) {
Query: query.All,
Limit: 10,
}))
require.NoError(t, s.Publish(ctx, pubstring("Aggamon")))
require.NoError(t, s.Publish(pubstring("Aggamon")))
sub.mustReceive(ctx, pubstring("Aggamon"))
})
}
@@ -72,7 +72,7 @@ func TestObserver(t *testing.T) {
}))
const input = pubstring("Lions and tigers and bears, oh my!")
require.NoError(t, s.Publish(ctx, input))
require.NoError(t, s.Publish(input))
<-done
require.Equal(t, got, input)
}
@@ -106,9 +106,9 @@ func TestPublishDoesNotBlock(t *testing.T) {
go func() {
defer close(published)
require.NoError(t, s.Publish(ctx, pubstring("Quicksilver")))
require.NoError(t, s.Publish(ctx, pubstring("Asylum")))
require.NoError(t, s.Publish(ctx, pubstring("Ivan")))
require.NoError(t, s.Publish(pubstring("Quicksilver")))
require.NoError(t, s.Publish(pubstring("Asylum")))
require.NoError(t, s.Publish(pubstring("Ivan")))
}()
select {
@@ -149,9 +149,9 @@ func TestSlowSubscriber(t *testing.T) {
Query: query.All,
}))
require.NoError(t, s.Publish(ctx, pubstring("Fat Cobra")))
require.NoError(t, s.Publish(ctx, pubstring("Viper")))
require.NoError(t, s.Publish(ctx, pubstring("Black Panther")))
require.NoError(t, s.Publish(pubstring("Fat Cobra")))
require.NoError(t, s.Publish(pubstring("Viper")))
require.NoError(t, s.Publish(pubstring("Black Panther")))
// We had capacity for one item, so we should get that item, but after that
// the subscription should have been terminated by the publisher.
@@ -176,7 +176,7 @@ func TestDifferentClients(t *testing.T) {
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}},
}}
require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events))
require.NoError(t, s.PublishWithEvents(pubstring("Iceman"), events))
sub1.mustReceive(ctx, pubstring("Iceman"))
sub2 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
@@ -195,7 +195,7 @@ func TestDifferentClients(t *testing.T) {
},
}
require.NoError(t, s.PublishWithEvents(ctx, pubstring("Ultimo"), events))
require.NoError(t, s.PublishWithEvents(pubstring("Ultimo"), events))
sub1.mustReceive(ctx, pubstring("Ultimo"))
sub2.mustReceive(ctx, pubstring("Ultimo"))
@@ -210,7 +210,7 @@ func TestDifferentClients(t *testing.T) {
Attributes: []abci.EventAttribute{{Key: "type", Value: "NewRoundStep"}},
}}
require.NoError(t, s.PublishWithEvents(ctx, pubstring("Valeria Richards"), events))
require.NoError(t, s.PublishWithEvents(pubstring("Valeria Richards"), events))
sub3.mustTimeOut(ctx, 100*time.Millisecond)
}
@@ -259,7 +259,7 @@ func TestSubscribeDuplicateKeys(t *testing.T) {
},
}
require.NoError(t, s.PublishWithEvents(ctx, pubstring("Iceman"), events))
require.NoError(t, s.PublishWithEvents(pubstring("Iceman"), events))
if tc.expected != nil {
sub.mustReceive(ctx, tc.expected)
@@ -288,7 +288,7 @@ func TestClientSubscribesTwice(t *testing.T) {
Query: q,
}))
require.NoError(t, s.PublishWithEvents(ctx, pubstring("Goblin Queen"), events))
require.NoError(t, s.PublishWithEvents(pubstring("Goblin Queen"), events))
sub1.mustReceive(ctx, pubstring("Goblin Queen"))
// Subscribing a second time with the same client ID and query fails.
@@ -302,7 +302,7 @@ func TestClientSubscribesTwice(t *testing.T) {
}
// The attempt to re-subscribe does not disrupt the existing sub.
require.NoError(t, s.PublishWithEvents(ctx, pubstring("Spider-Man"), events))
require.NoError(t, s.PublishWithEvents(pubstring("Spider-Man"), events))
sub1.mustReceive(ctx, pubstring("Spider-Man"))
}
@@ -325,7 +325,7 @@ func TestUnsubscribe(t *testing.T) {
}))
// Publishing should still work.
require.NoError(t, s.Publish(ctx, pubstring("Nick Fury")))
require.NoError(t, s.Publish(pubstring("Nick Fury")))
// The unsubscribed subscriber should report as such.
sub.mustFail(ctx, pubsub.ErrUnsubscribed)
@@ -373,7 +373,7 @@ func TestResubscribe(t *testing.T) {
sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, args))
require.NoError(t, s.Publish(ctx, pubstring("Cable")))
require.NoError(t, s.Publish(pubstring("Cable")))
sub.mustReceive(ctx, pubstring("Cable"))
}
@@ -394,7 +394,7 @@ func TestUnsubscribeAll(t *testing.T) {
}))
require.NoError(t, s.UnsubscribeAll(ctx, clientID))
require.NoError(t, s.Publish(ctx, pubstring("Nick Fury")))
require.NoError(t, s.Publish(pubstring("Nick Fury")))
sub1.mustFail(ctx, pubsub.ErrUnsubscribed)
sub2.mustFail(ctx, pubsub.ErrUnsubscribed)
@@ -410,13 +410,24 @@ func TestBufferCapacity(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, s.Publish(ctx, pubstring("Nighthawk")))
require.NoError(t, s.Publish(ctx, pubstring("Sage")))
require.NoError(t, s.Publish(pubstring("Nighthawk")))
require.NoError(t, s.Publish(pubstring("Sage")))
ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
require.ErrorIs(t, s.Publish(ctx, pubstring("Ironclad")), context.DeadlineExceeded)
sig := make(chan struct{})
go func() { defer close(sig); _ = s.Publish(pubstring("Ironclad")) }()
select {
case <-sig:
t.Fatal("should not fire")
case <-ctx.Done():
return
}
}
func newTestServer(ctx context.Context, t testing.TB, logger log.Logger) *pubsub.Server {

View File

@@ -291,7 +291,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
// Events are fired after everything else.
// NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(ctx, blockExec.logger, blockExec.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
return state, nil
}
@@ -530,7 +530,6 @@ func (state State) Update(
// Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(
ctx context.Context,
logger log.Logger,
eventBus types.BlockEventPublisher,
block *types.Block,
@@ -538,7 +537,7 @@ func fireEvents(
finalizeBlockResponse *abci.ResponseFinalizeBlock,
validatorUpdates []*types.Validator,
) {
if err := eventBus.PublishEventNewBlock(ctx, types.EventDataNewBlock{
if err := eventBus.PublishEventNewBlock(types.EventDataNewBlock{
Block: block,
BlockID: blockID,
ResultFinalizeBlock: *finalizeBlockResponse,
@@ -546,7 +545,7 @@ func fireEvents(
logger.Error("failed publishing new block", "err", err)
}
if err := eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{
if err := eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: block.Header,
NumTxs: int64(len(block.Txs)),
ResultFinalizeBlock: *finalizeBlockResponse,
@@ -556,7 +555,7 @@ func fireEvents(
if len(block.Evidence) != 0 {
for _, ev := range block.Evidence {
if err := eventBus.PublishEventNewEvidence(ctx, types.EventDataNewEvidence{
if err := eventBus.PublishEventNewEvidence(types.EventDataNewEvidence{
Evidence: ev,
Height: block.Height,
}); err != nil {
@@ -572,7 +571,7 @@ func fireEvents(
}
for i, tx := range block.Data.Txs {
if err := eventBus.PublishEventTx(ctx, types.EventDataTx{
if err := eventBus.PublishEventTx(types.EventDataTx{
TxResult: abci.TxResult{
Height: block.Height,
Index: uint32(i),
@@ -585,7 +584,7 @@ func fireEvents(
}
if len(finalizeBlockResponse.ValidatorUpdates) > 0 {
if err := eventBus.PublishEventValidatorSetUpdates(ctx,
if err := eventBus.PublishEventValidatorSetUpdates(
types.EventDataValidatorSetUpdates{ValidatorUpdates: validatorUpdates}); err != nil {
logger.Error("failed publishing event", "err", err)
}
@@ -644,7 +643,7 @@ func ExecCommitBlock(
}
blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: bps.Header()}
fireEvents(ctx, be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
fireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates)
}
// Commit block, get hash back

View File

@@ -71,7 +71,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
t.Cleanup(service.Wait)
// publish block with txs
err = eventBus.PublishEventNewBlockHeader(ctx, types.EventDataNewBlockHeader{
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1},
NumTxs: int64(2),
})
@@ -82,7 +82,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
Tx: types.Tx("foo"),
Result: abci.ExecTxResult{Code: 0},
}
err = eventBus.PublishEventTx(ctx, types.EventDataTx{TxResult: *txResult1})
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1})
require.NoError(t, err)
txResult2 := &abci.TxResult{
Height: 1,
@@ -90,7 +90,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
Tx: types.Tx("bar"),
Result: abci.ExecTxResult{Code: 0},
}
err = eventBus.PublishEventTx(ctx, types.EventDataTx{TxResult: *txResult2})
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)

View File

@@ -333,7 +333,7 @@ func (r *Reactor) OnStop() {
// of historical blocks before participating in consensus
func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
if r.eventBus != nil {
if err := r.eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{
if err := r.eventBus.PublishEventStateSyncStatus(types.EventDataStateSyncStatus{
Complete: false,
Height: r.initialHeight,
}); err != nil {
@@ -387,7 +387,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
}
if r.eventBus != nil {
if err := r.eventBus.PublishEventStateSyncStatus(ctx, types.EventDataStateSyncStatus{
if err := r.eventBus.PublishEventStateSyncStatus(types.EventDataStateSyncStatus{
Complete: true,
Height: state.LastBlockHeight,
}); err != nil {

View File

@@ -2,7 +2,6 @@
package events
import (
"context"
"sync"
)
@@ -20,7 +19,7 @@ type Eventable interface {
//
// FireEvent fires an event with the given name and data.
type Fireable interface {
FireEvent(ctx context.Context, eventValue string, data EventData)
FireEvent(eventValue string, data EventData)
}
// EventSwitch is the interface for synchronous pubsub, where listeners
@@ -62,7 +61,7 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb E
return nil
}
func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) {
func (evsw *eventSwitch) FireEvent(event string, data EventData) {
// Get the eventCell
evsw.mtx.RLock()
eventCell := evsw.eventCells[event]
@@ -73,12 +72,12 @@ func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data Event
}
// Fire event for all listeners in eventCell
eventCell.fireEvent(ctx, data)
eventCell.fireEvent(data)
}
//-----------------------------------------------------------------------------
type EventCallback func(ctx context.Context, data EventData) error
type EventCallback func(data EventData) error
// eventCell handles keeping track of listener callbacks for a given event.
type eventCell struct {
@@ -98,7 +97,7 @@ func (cell *eventCell) addListener(listenerID string, cb EventCallback) {
cell.listeners[listenerID] = cb
}
func (cell *eventCell) fireEvent(ctx context.Context, data EventData) {
func (cell *eventCell) fireEvent(data EventData) {
cell.mtx.RLock()
eventCallbacks := make([]EventCallback, 0, len(cell.listeners))
for _, cb := range cell.listeners {
@@ -107,7 +106,7 @@ func (cell *eventCell) fireEvent(ctx context.Context, data EventData) {
cell.mtx.RUnlock()
for _, cb := range eventCallbacks {
if err := cb(ctx, data); err != nil {
if err := cb(data); err != nil {
// should we log or abort here?
continue
}

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/require"
)
@@ -20,7 +21,7 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
messages := make(chan EventData)
require.NoError(t, evsw.AddListenerForEvent("listener", "event",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case messages <- data:
return nil
@@ -28,7 +29,7 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
return ctx.Err()
}
}))
go evsw.FireEvent(ctx, "event", "data")
go evsw.FireEvent("event", "data")
received := <-messages
if received != "data" {
t.Errorf("message received does not match: %v", received)
@@ -48,7 +49,7 @@ func TestAddListenerForEventFireMany(t *testing.T) {
numbers := make(chan uint64, 4)
// subscribe one listener for one event
require.NoError(t, evsw.AddListenerForEvent("listener", "event",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers <- data.(uint64):
return nil
@@ -75,6 +76,8 @@ func TestAddListenerForDifferentEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Cleanup(leaktest.Check(t))
evsw := NewEventSwitch()
doneSum := make(chan uint64)
@@ -84,7 +87,7 @@ func TestAddListenerForDifferentEvents(t *testing.T) {
numbers := make(chan uint64, 4)
// subscribe one listener to three events
require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers <- data.(uint64):
return nil
@@ -93,7 +96,7 @@ func TestAddListenerForDifferentEvents(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers <- data.(uint64):
return nil
@@ -102,7 +105,7 @@ func TestAddListenerForDifferentEvents(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener", "event3",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers <- data.(uint64):
return nil
@@ -135,6 +138,8 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Cleanup(leaktest.Check(t))
evsw := NewEventSwitch()
doneSum1 := make(chan uint64)
@@ -146,7 +151,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
numbers2 := make(chan uint64, 4)
// subscribe two listener to three events
require.NoError(t, evsw.AddListenerForEvent("listener1", "event1",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers1 <- data.(uint64):
return nil
@@ -155,7 +160,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener1", "event2",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers1 <- data.(uint64):
return nil
@@ -164,7 +169,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener1", "event3",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers1 <- data.(uint64):
return nil
@@ -173,7 +178,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener2", "event2",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers2 <- data.(uint64):
return nil
@@ -182,7 +187,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener2", "event3",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers2 <- data.(uint64):
return nil
@@ -238,7 +243,7 @@ func TestManageListenersAsync(t *testing.T) {
numbers2 := make(chan uint64, 4)
// subscribe two listener to three events
require.NoError(t, evsw.AddListenerForEvent("listener1", "event1",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers1 <- data.(uint64):
return nil
@@ -247,7 +252,7 @@ func TestManageListenersAsync(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener1", "event2",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers1 <- data.(uint64):
return nil
@@ -256,7 +261,7 @@ func TestManageListenersAsync(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener1", "event3",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers1 <- data.(uint64):
return nil
@@ -265,7 +270,7 @@ func TestManageListenersAsync(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener2", "event1",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers2 <- data.(uint64):
return nil
@@ -274,7 +279,7 @@ func TestManageListenersAsync(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener2", "event2",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers2 <- data.(uint64):
return nil
@@ -283,7 +288,7 @@ func TestManageListenersAsync(t *testing.T) {
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener2", "event3",
func(ctx context.Context, data EventData) error {
func(data EventData) error {
select {
case numbers2 <- data.(uint64):
return nil
@@ -303,7 +308,7 @@ func TestManageListenersAsync(t *testing.T) {
eventNumber := r1.Intn(3) + 1
go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber), //nolint:errcheck // ignore for tests
fmt.Sprintf("event%v", eventNumber),
func(context.Context, EventData) error { return nil })
func(EventData) error { return nil })
}
}
addListenersStress()
@@ -358,7 +363,7 @@ func fireEvents(ctx context.Context, evsw Fireable, event string, doneChan chan
break
}
evsw.FireEvent(ctx, event, i)
evsw.FireEvent(event, i)
sentSum += i
}

View File

@@ -69,7 +69,7 @@ func BenchmarkSequence(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
headers, vals, _ := genLightBlocksWithKeys(b, chainID, 1000, 100, 1, bTime)
headers, vals, _ := genLightBlocksWithKeys(b, 1000, 100, 1, bTime)
benchmarkFullNode := newProviderBenchmarkImpl(headers, vals)
genesisBlock, _ := benchmarkFullNode.LightBlock(ctx, 1)
@@ -106,7 +106,7 @@ func BenchmarkBisection(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
headers, vals, _ := genLightBlocksWithKeys(b, chainID, 1000, 100, 1, bTime)
headers, vals, _ := genLightBlocksWithKeys(b, 1000, 100, 1, bTime)
benchmarkFullNode := newProviderBenchmarkImpl(headers, vals)
genesisBlock, _ := benchmarkFullNode.LightBlock(ctx, 1)
@@ -142,7 +142,7 @@ func BenchmarkBackwards(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
headers, vals, _ := genLightBlocksWithKeys(b, chainID, 1000, 100, 1, bTime)
headers, vals, _ := genLightBlocksWithKeys(b, 1000, 100, 1, bTime)
benchmarkFullNode := newProviderBenchmarkImpl(headers, vals)
trustedBlock, _ := benchmarkFullNode.LightBlock(ctx, 0)

View File

@@ -387,7 +387,7 @@ func TestClient(t *testing.T) {
// the appropriate range
numBlocks := int64(300)
mockHeaders, mockVals, _ := genLightBlocksWithKeys(t, chainID, numBlocks, 101, 2, bTime)
mockHeaders, mockVals, _ := genLightBlocksWithKeys(t, numBlocks, 101, 2, bTime)
lastBlock := &types.LightBlock{SignedHeader: mockHeaders[numBlocks], ValidatorSet: mockVals[numBlocks]}
mockNode := &provider_mocks.Provider{}
@@ -773,7 +773,7 @@ func TestClient(t *testing.T) {
logger := log.NewNopLogger()
{
headers, vals, _ := genLightBlocksWithKeys(t, chainID, 9, 3, 0, bTime)
headers, vals, _ := genLightBlocksWithKeys(t, 9, 3, 0, bTime)
delete(headers, 1)
delete(headers, 2)
delete(vals, 1)

View File

@@ -35,7 +35,7 @@ func TestLightClientAttackEvidence_Lunatic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
witnessHeaders, witnessValidators, chainKeys := genLightBlocksWithKeys(t, chainID, latestHeight, valSize, 2, bTime)
witnessHeaders, witnessValidators, chainKeys := genLightBlocksWithKeys(t, latestHeight, valSize, 2, bTime)
forgedKeys := chainKeys[divergenceHeight-1].ChangeKeys(3) // we change 3 out of the 5 validators (still 2/5 remain)
forgedVals := forgedKeys.ToValidators(2, 0)
@@ -153,7 +153,7 @@ func TestLightClientAttackEvidence_Equivocation(t *testing.T) {
// validators don't change in this network (however we still use a map just for convenience)
primaryValidators = make(map[int64]*types.ValidatorSet, testCase.latestHeight)
)
witnessHeaders, witnessValidators, chainKeys := genLightBlocksWithKeys(t, chainID,
witnessHeaders, witnessValidators, chainKeys := genLightBlocksWithKeys(t,
testCase.latestHeight+1, valSize, 2, bTime)
for height := int64(1); height <= testCase.latestHeight; height++ {
if height < testCase.divergenceHeight {
@@ -250,7 +250,7 @@ func TestLightClientAttackEvidence_ForwardLunatic(t *testing.T) {
defer cancel()
logger := log.NewNopLogger()
witnessHeaders, witnessValidators, chainKeys := genLightBlocksWithKeys(t, chainID, latestHeight, valSize, 2, bTime)
witnessHeaders, witnessValidators, chainKeys := genLightBlocksWithKeys(t, latestHeight, valSize, 2, bTime)
for _, unusedHeader := range []int64{3, 5, 6, 8} {
delete(witnessHeaders, unusedHeader)
}
@@ -401,13 +401,13 @@ func TestClientDivergentTraces1(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
headers, vals, _ := genLightBlocksWithKeys(t, chainID, 1, 5, 2, bTime)
headers, vals, _ := genLightBlocksWithKeys(t, 1, 5, 2, bTime)
mockPrimary := mockNodeFromHeadersAndVals(headers, vals)
mockPrimary.On("ID").Return("mockPrimary")
firstBlock, err := mockPrimary.LightBlock(ctx, 1)
require.NoError(t, err)
headers, vals, _ = genLightBlocksWithKeys(t, chainID, 1, 5, 2, bTime)
headers, vals, _ = genLightBlocksWithKeys(t, 1, 5, 2, bTime)
mockWitness := mockNodeFromHeadersAndVals(headers, vals)
mockWitness.On("ID").Return("mockWitness")
@@ -439,7 +439,7 @@ func TestClientDivergentTraces2(t *testing.T) {
defer cancel()
logger := log.NewNopLogger()
headers, vals, _ := genLightBlocksWithKeys(t, chainID, 2, 5, 2, bTime)
headers, vals, _ := genLightBlocksWithKeys(t, 2, 5, 2, bTime)
mockPrimaryNode := mockNodeFromHeadersAndVals(headers, vals)
mockPrimaryNode.On("ID").Return("mockPrimaryNode")
@@ -485,7 +485,7 @@ func TestClientDivergentTraces3(t *testing.T) {
logger := log.NewNopLogger()
//
primaryHeaders, primaryVals, _ := genLightBlocksWithKeys(t, chainID, 2, 5, 2, bTime)
primaryHeaders, primaryVals, _ := genLightBlocksWithKeys(t, 2, 5, 2, bTime)
mockPrimary := mockNodeFromHeadersAndVals(primaryHeaders, primaryVals)
mockPrimary.On("ID").Return("mockPrimary")
@@ -495,7 +495,7 @@ func TestClientDivergentTraces3(t *testing.T) {
firstBlock, err := mockPrimary.LightBlock(ctx, 1)
require.NoError(t, err)
mockHeaders, mockVals, _ := genLightBlocksWithKeys(t, chainID, 2, 5, 2, bTime)
mockHeaders, mockVals, _ := genLightBlocksWithKeys(t, 2, 5, 2, bTime)
mockHeaders[1] = primaryHeaders[1]
mockVals[1] = primaryVals[1]
mockWitness := mockNodeFromHeadersAndVals(mockHeaders, mockVals)
@@ -530,7 +530,7 @@ func TestClientDivergentTraces4(t *testing.T) {
logger := log.NewNopLogger()
//
primaryHeaders, primaryVals, _ := genLightBlocksWithKeys(t, chainID, 2, 5, 2, bTime)
primaryHeaders, primaryVals, _ := genLightBlocksWithKeys(t, 2, 5, 2, bTime)
mockPrimary := mockNodeFromHeadersAndVals(primaryHeaders, primaryVals)
mockPrimary.On("ID").Return("mockPrimary")
@@ -540,7 +540,7 @@ func TestClientDivergentTraces4(t *testing.T) {
firstBlock, err := mockPrimary.LightBlock(ctx, 1)
require.NoError(t, err)
witnessHeaders, witnessVals, _ := genLightBlocksWithKeys(t, chainID, 2, 5, 2, bTime)
witnessHeaders, witnessVals, _ := genLightBlocksWithKeys(t, 2, 5, 2, bTime)
primaryHeaders[2] = witnessHeaders[2]
primaryVals[2] = witnessVals[2]
mockWitness := mockNodeFromHeadersAndVals(primaryHeaders, primaryVals)

View File

@@ -160,7 +160,6 @@ func (pkz privKeys) ChangeKeys(delta int) privKeys {
// NOTE: Expected to have a large validator set size ~ 100 validators.
func genLightBlocksWithKeys(
t testing.TB,
chainID string,
numBlocks int64,
valSize int,
valVariation float32,

View File

@@ -292,16 +292,32 @@ func makeNode(
blockSync := !onlyValidatorIsUs(state, pubKey)
waitSync := stateSync || blockSync
csReactor, csState, err := createConsensusReactor(ctx,
cfg, stateStore, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, waitSync, eventBus,
peerManager, node.router.OpenChannel, logger,
csState, err := consensus.NewState(logger.With("module", "consensus"),
cfg.Consensus,
stateStore,
blockExec,
blockStore,
mp,
evPool,
eventBus,
consensus.StateMetrics(nodeMetrics.consensus),
consensus.SkipStateStoreBootstrap,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
node.services = append(node.services, csReactor)
node.rpcEnv.ConsensusState = csState
csReactor := consensus.NewReactor(
logger,
csState,
node.router.OpenChannel,
peerManager.Subscribe,
eventBus,
waitSync,
nodeMetrics.consensus,
)
node.services = append(node.services, csReactor)
node.rpcEnv.ConsensusReactor = csReactor
// Create the blockchain reactor. Note, we do not start block sync if we're
@@ -370,6 +386,9 @@ func makeNode(
))
if cfg.Mode == config.ModeValidator {
if privValidator != nil {
csState.SetPrivValidator(ctx, privValidator)
}
node.rpcEnv.PubKey = pubKey
}

View File

@@ -231,56 +231,6 @@ func createEvidenceReactor(
return evidenceReactor, evidencePool, dbCloser, nil
}
func createConsensusReactor(
ctx context.Context,
cfg *config.Config,
store sm.Store,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mp mempool.Mempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *consensus.Metrics,
waitSync bool,
eventBus *eventbus.EventBus,
peerManager *p2p.PeerManager,
chCreator p2p.ChannelCreator,
logger log.Logger,
) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus")
consensusState, err := consensus.NewState(ctx,
logger,
cfg.Consensus,
store,
blockExec,
blockStore,
mp,
evidencePool,
eventBus,
consensus.StateMetrics(csMetrics),
consensus.SkipStateStoreBootstrap,
)
if err != nil {
return nil, nil, err
}
if privValidator != nil && cfg.Mode == config.ModeValidator {
consensusState.SetPrivValidator(ctx, privValidator)
}
reactor := consensus.NewReactor(
logger,
consensusState,
chCreator,
peerManager.Subscribe,
eventBus,
waitSync,
csMetrics,
)
return reactor, consensusState, nil
}
func createPeerManager(
cfg *config.Config,
dbProvider config.DBProvider,

View File

@@ -1,7 +1,6 @@
package types
import (
"context"
"fmt"
"strings"
@@ -308,15 +307,15 @@ func QueryForEvent(eventValue string) *tmquery.Query {
// BlockEventPublisher publishes all block related events
type BlockEventPublisher interface {
PublishEventNewBlock(ctx context.Context, block EventDataNewBlock) error
PublishEventNewBlockHeader(ctx context.Context, header EventDataNewBlockHeader) error
PublishEventNewEvidence(ctx context.Context, evidence EventDataNewEvidence) error
PublishEventTx(context.Context, EventDataTx) error
PublishEventValidatorSetUpdates(context.Context, EventDataValidatorSetUpdates) error
PublishEventNewBlock(EventDataNewBlock) error
PublishEventNewBlockHeader(EventDataNewBlockHeader) error
PublishEventNewEvidence(EventDataNewEvidence) error
PublishEventTx(EventDataTx) error
PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates) error
}
type TxEventPublisher interface {
PublishEventTx(context.Context, EventDataTx) error
PublishEventTx(EventDataTx) error
}
// eventWithAttr constructs a single abci.Event with a single attribute.