mirror of
https://github.com/tendermint/tendermint.git
synced 2026-04-24 09:40:32 +00:00
node: pass eventbus at construction time (#8084)
* node: pass eventbus at construction time * remove cruft
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/consensus"
|
||||
"github.com/tendermint/tendermint/internal/eventbus"
|
||||
"github.com/tendermint/tendermint/internal/mempool/mock"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
|
||||
@@ -121,6 +122,9 @@ func (rts *reactorTestSuite) addNode(
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stateStore.Save(state))
|
||||
|
||||
eventbus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventbus.Start(ctx))
|
||||
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
@@ -128,6 +132,7 @@ func (rts *reactorTestSuite) addNode(
|
||||
mock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
eventbus,
|
||||
)
|
||||
|
||||
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
|
||||
|
||||
@@ -95,14 +95,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
evpool := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus)
|
||||
|
||||
// Make State
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
|
||||
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus)
|
||||
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
|
||||
require.NoError(t, err)
|
||||
// set private validator
|
||||
pv := privVals[i]
|
||||
cs.SetPrivValidator(ctx, pv)
|
||||
|
||||
cs.SetEventBus(eventBus)
|
||||
cs.SetTimeoutTicker(tickerFunc())
|
||||
|
||||
states[i] = cs
|
||||
|
||||
@@ -482,7 +482,10 @@ func newStateWithConfigAndBlockStore(
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
require.NoError(t, stateStore.Save(state))
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore)
|
||||
eventBus := eventbus.NewDefault(logger.With("module", "events"))
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore, eventBus)
|
||||
cs, err := NewState(ctx,
|
||||
logger.With("module", "consensus"),
|
||||
thisConfig.Consensus,
|
||||
@@ -491,6 +494,7 @@ func newStateWithConfigAndBlockStore(
|
||||
blockStore,
|
||||
mempool,
|
||||
evpool,
|
||||
eventBus,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -498,10 +502,6 @@ func newStateWithConfigAndBlockStore(
|
||||
|
||||
cs.SetPrivValidator(ctx, pv)
|
||||
|
||||
eventBus := eventbus.NewDefault(logger.With("module", "events"))
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
cs.SetEventBus(eventBus)
|
||||
return cs
|
||||
}
|
||||
|
||||
|
||||
@@ -138,6 +138,7 @@ func NewReactor(
|
||||
cs *State,
|
||||
channelCreator p2p.ChannelCreator,
|
||||
peerUpdates *p2p.PeerUpdates,
|
||||
eventBus *eventbus.EventBus,
|
||||
waitSync bool,
|
||||
metrics *Metrics,
|
||||
) (*Reactor, error) {
|
||||
@@ -166,6 +167,7 @@ func NewReactor(
|
||||
state: cs,
|
||||
waitSync: waitSync,
|
||||
peers: make(map[types.NodeID]*PeerState),
|
||||
eventBus: eventBus,
|
||||
Metrics: metrics,
|
||||
stateCh: stateCh,
|
||||
dataCh: dataCh,
|
||||
@@ -226,12 +228,6 @@ func (r *Reactor) OnStop() {
|
||||
}
|
||||
}
|
||||
|
||||
// SetEventBus sets the reactor's event bus.
|
||||
func (r *Reactor) SetEventBus(b *eventbus.EventBus) {
|
||||
r.eventBus = b
|
||||
r.state.SetEventBus(b)
|
||||
}
|
||||
|
||||
// WaitSync returns whether the consensus reactor is waiting for state/block sync.
|
||||
func (r *Reactor) WaitSync() bool {
|
||||
r.mtx.RLock()
|
||||
|
||||
@@ -110,13 +110,12 @@ func setup(
|
||||
state,
|
||||
chCreator(nodeID),
|
||||
node.MakePeerUpdates(ctx, t),
|
||||
state.eventBus,
|
||||
true,
|
||||
NopMetrics(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
reactor.SetEventBus(state.eventBus)
|
||||
|
||||
blocksSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
|
||||
ClientID: testSubscriber,
|
||||
Query: types.EventQueryNewBlock,
|
||||
@@ -504,16 +503,15 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
|
||||
evpool2 := sm.EmptyEvidencePool{}
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
|
||||
|
||||
cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"),
|
||||
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2)
|
||||
require.NoError(t, err)
|
||||
cs.SetPrivValidator(ctx, pv)
|
||||
|
||||
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
cs.SetEventBus(eventBus)
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus)
|
||||
|
||||
cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"),
|
||||
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2, eventBus)
|
||||
require.NoError(t, err)
|
||||
cs.SetPrivValidator(ctx, pv)
|
||||
|
||||
cs.SetTimeoutTicker(tickerFunc())
|
||||
|
||||
|
||||
@@ -205,7 +205,7 @@ type Handshaker struct {
|
||||
stateStore sm.Store
|
||||
initialState sm.State
|
||||
store sm.BlockStore
|
||||
eventBus types.BlockEventPublisher
|
||||
eventBus *eventbus.EventBus
|
||||
genDoc *types.GenesisDoc
|
||||
logger log.Logger
|
||||
|
||||
@@ -217,7 +217,7 @@ func NewHandshaker(
|
||||
stateStore sm.Store,
|
||||
state sm.State,
|
||||
store sm.BlockStore,
|
||||
eventBus types.BlockEventPublisher,
|
||||
eventBus *eventbus.EventBus,
|
||||
genDoc *types.GenesisDoc,
|
||||
) *Handshaker {
|
||||
|
||||
@@ -484,9 +484,7 @@ func (h *Handshaker) replayBlocks(
|
||||
if i == finalBlock && !mutateState {
|
||||
// We emit events for the index services at the final block due to the sync issue when
|
||||
// the node shutdown during the block committing status.
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
|
||||
blockExec.SetEventBus(h.eventBus)
|
||||
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store, h.eventBus)
|
||||
appHash, err = sm.ExecCommitBlock(ctx,
|
||||
blockExec, appClient, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
if err != nil {
|
||||
@@ -528,8 +526,7 @@ func (h *Handshaker) replayBlock(
|
||||
|
||||
// Use stubs for both mempool and evidence pool since no transactions nor
|
||||
// evidence are needed here - block already exists.
|
||||
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
|
||||
blockExec.SetEventBus(h.eventBus)
|
||||
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store, h.eventBus)
|
||||
|
||||
var err error
|
||||
state, err = blockExec.ApplyBlock(ctx, state, meta.BlockID, block)
|
||||
|
||||
@@ -146,11 +146,10 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event
|
||||
pb.cs.Wait()
|
||||
|
||||
newCS, err := NewState(ctx, pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec,
|
||||
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
|
||||
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, pb.cs.eventBus)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newCS.SetEventBus(pb.cs.eventBus)
|
||||
newCS.startForReplay()
|
||||
|
||||
if err := pb.fp.Close(); err != nil {
|
||||
@@ -349,13 +348,12 @@ func newConsensusStateForReplay(
|
||||
}
|
||||
|
||||
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore, eventBus)
|
||||
|
||||
consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec,
|
||||
blockStore, mempool, evpool)
|
||||
blockStore, mempool, evpool, eventBus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
consensusState.SetEventBus(eventBus)
|
||||
return consensusState, nil
|
||||
}
|
||||
|
||||
@@ -748,6 +748,9 @@ func testHandshakeReplay(
|
||||
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
|
||||
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp)
|
||||
if nBlocks > 0 {
|
||||
// run nBlocks against a new client to build up the app state.
|
||||
@@ -757,7 +760,7 @@ func testHandshakeReplay(
|
||||
stateStore := sm.NewStore(stateDB1)
|
||||
err := stateStore.Save(genesisState)
|
||||
require.NoError(t, err)
|
||||
buildAppStateFromChain(ctx, t, proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, nBlocks, mode, store)
|
||||
buildAppStateFromChain(ctx, t, proxyApp, stateStore, sim.Mempool, sim.Evpool, genesisState, chain, eventBus, nBlocks, mode, store)
|
||||
}
|
||||
|
||||
// Prune block store if requested
|
||||
@@ -772,7 +775,7 @@ func testHandshakeReplay(
|
||||
// now start the app using the handshake - it should sync
|
||||
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
|
||||
handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
|
||||
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
|
||||
require.True(t, proxyApp.IsRunning())
|
||||
@@ -822,9 +825,10 @@ func applyBlock(
|
||||
blk *types.Block,
|
||||
appClient abciclient.Client,
|
||||
blockStore *mockBlockStore,
|
||||
eventBus *eventbus.EventBus,
|
||||
) sm.State {
|
||||
testPartSize := types.BlockPartSizeBytes
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore, eventBus)
|
||||
|
||||
bps, err := blk.MakePartSet(testPartSize)
|
||||
require.NoError(t, err)
|
||||
@@ -843,6 +847,7 @@ func buildAppStateFromChain(
|
||||
evpool sm.EvidencePool,
|
||||
state sm.State,
|
||||
chain []*types.Block,
|
||||
eventBus *eventbus.EventBus,
|
||||
nBlocks int,
|
||||
mode uint,
|
||||
blockStore *mockBlockStore,
|
||||
@@ -864,18 +869,18 @@ func buildAppStateFromChain(
|
||||
case 0:
|
||||
for i := 0; i < nBlocks; i++ {
|
||||
block := chain[i]
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore)
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore, eventBus)
|
||||
}
|
||||
case 1, 2, 3:
|
||||
for i := 0; i < nBlocks-1; i++ {
|
||||
block := chain[i]
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore)
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore, eventBus)
|
||||
}
|
||||
|
||||
if mode == 2 || mode == 3 {
|
||||
// update the kvstore height and apphash
|
||||
// as if we ran commit but not
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], appClient, blockStore)
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], appClient, blockStore, eventBus)
|
||||
}
|
||||
default:
|
||||
require.Fail(t, "unknown mode %v", mode)
|
||||
@@ -917,23 +922,26 @@ func buildTMStateFromChain(
|
||||
|
||||
require.NoError(t, stateStore.Save(state))
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
switch mode {
|
||||
case 0:
|
||||
// sync right up
|
||||
for _, block := range chain {
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore, eventBus)
|
||||
}
|
||||
|
||||
case 1, 2, 3:
|
||||
// sync up to the penultimate as if we stored the block.
|
||||
// whether we commit or not depends on the appHash
|
||||
for _, block := range chain[:len(chain)-1] {
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
|
||||
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore, eventBus)
|
||||
}
|
||||
|
||||
// apply the final block to a state copy so we can
|
||||
// get the right next appHash but keep the state back
|
||||
applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore)
|
||||
applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[len(chain)-1], proxyApp, blockStore, eventBus)
|
||||
default:
|
||||
require.Fail(t, "unknown mode %v", mode)
|
||||
}
|
||||
@@ -970,6 +978,9 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
|
||||
|
||||
logger := log.TestingLogger()
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
// 2. Tendermint must panic if app returns wrong hash for the first block
|
||||
// - RANDOM HASH
|
||||
// - 0x02
|
||||
@@ -983,7 +994,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
|
||||
t.Cleanup(func() { cancel(); proxyApp.Wait() })
|
||||
|
||||
assert.Panics(t, func() {
|
||||
h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
|
||||
h := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
|
||||
if err = h.Handshake(ctx, proxyApp); err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
@@ -1003,7 +1014,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
|
||||
t.Cleanup(func() { cancel(); proxyApp.Wait() })
|
||||
|
||||
assert.Panics(t, func() {
|
||||
h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
|
||||
h := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
|
||||
if err = h.Handshake(ctx, proxyApp); err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
@@ -1235,6 +1246,9 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
|
||||
app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
|
||||
client := abciclient.NewLocalClient(logger, app)
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
cfg, err := ResetConfig(t.TempDir(), "handshake_test_")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(cfg.RootDir) })
|
||||
@@ -1252,7 +1266,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
|
||||
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
|
||||
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
|
||||
handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
|
||||
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
|
||||
|
||||
|
||||
@@ -197,9 +197,11 @@ func NewState(
|
||||
blockStore sm.BlockStore,
|
||||
txNotifier txNotifier,
|
||||
evpool evidencePool,
|
||||
eventBus *eventbus.EventBus,
|
||||
options ...StateOption,
|
||||
) (*State, error) {
|
||||
cs := &State{
|
||||
eventBus: eventBus,
|
||||
logger: logger,
|
||||
config: cfg,
|
||||
blockExec: blockExec,
|
||||
@@ -260,12 +262,6 @@ func (cs *State) updateStateFromStore(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetEventBus sets event bus.
|
||||
func (cs *State) SetEventBus(b *eventbus.EventBus) {
|
||||
cs.eventBus = b
|
||||
cs.blockExec.SetEventBus(b)
|
||||
}
|
||||
|
||||
// StateMetrics sets the metrics.
|
||||
func StateMetrics(metrics *Metrics) StateOption {
|
||||
return func(cs *State) { cs.metrics = metrics }
|
||||
|
||||
@@ -82,13 +82,12 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
|
||||
|
||||
mempool := emptyMempool{}
|
||||
evpool := sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
|
||||
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore, eventBus)
|
||||
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
consensusState.SetEventBus(eventBus)
|
||||
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
|
||||
consensusState.SetPrivValidator(ctx, privValidator)
|
||||
}
|
||||
|
||||
@@ -194,28 +194,3 @@ func (b *EventBus) PublishEventValidatorSetUpdates(ctx context.Context, data typ
|
||||
func (b *EventBus) PublishEventEvidenceValidated(ctx context.Context, evidence types.EventDataEvidenceValidated) error {
|
||||
return b.Publish(ctx, types.EventEvidenceValidatedValue, evidence)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
// NopEventBus implements a types.BlockEventPublisher that discards all events.
|
||||
type NopEventBus struct{}
|
||||
|
||||
func (NopEventBus) PublishEventNewBlock(context.Context, types.EventDataNewBlock) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (NopEventBus) PublishEventNewBlockHeader(context.Context, types.EventDataNewBlockHeader) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (NopEventBus) PublishEventNewEvidence(context.Context, types.EventDataNewEvidence) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (NopEventBus) PublishEventTx(context.Context, types.EventDataTx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (NopEventBus) PublishEventValidatorSetUpdates(context.Context, types.EventDataValidatorSetUpdates) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -64,12 +64,13 @@ func NewBlockExecutor(
|
||||
pool mempool.Mempool,
|
||||
evpool EvidencePool,
|
||||
blockStore BlockStore,
|
||||
eventBus *eventbus.EventBus,
|
||||
options ...BlockExecutorOption,
|
||||
) *BlockExecutor {
|
||||
res := &BlockExecutor{
|
||||
eventBus: eventBus,
|
||||
store: stateStore,
|
||||
appClient: appClient,
|
||||
eventBus: eventbus.NopEventBus{},
|
||||
mempool: pool,
|
||||
evpool: evpool,
|
||||
logger: logger,
|
||||
@@ -89,12 +90,6 @@ func (blockExec *BlockExecutor) Store() Store {
|
||||
return blockExec.store
|
||||
}
|
||||
|
||||
// SetEventBus - sets the event bus for publishing block related events.
|
||||
// If not called, it defaults to types.NopEventBus.
|
||||
func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
|
||||
blockExec.eventBus = eventBus
|
||||
}
|
||||
|
||||
// CreateProposalBlock calls state.MakeBlock with evidence from the evpool
|
||||
// and txs from the mempool. The max bytes must be big enough to fit the commit.
|
||||
// Up to 1/10th of the block space is allcoated for maximum sized evidence.
|
||||
|
||||
@@ -45,14 +45,15 @@ func TestApplyBlock(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, proxyApp.Start(ctx))
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
state, stateDB, _ := makeState(t, 1, 1)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp,
|
||||
mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, eventBus)
|
||||
|
||||
block, err := sf.MakeBlock(state, 1, new(types.Commit))
|
||||
require.NoError(t, err)
|
||||
@@ -103,7 +104,10 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) {
|
||||
evpool.On("Update", ctx, mock.Anything, mock.Anything).Return()
|
||||
evpool.On("CheckEvidence", ctx, mock.Anything).Return(nil)
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mmock.Mempool{}, evpool, blockStore)
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mmock.Mempool{}, evpool, blockStore, eventBus)
|
||||
state, _, lastCommit := makeAndCommitGoodBlock(ctx, t, state, 1, new(types.Commit), state.NextValidators.Validators[0].Address, blockExec, privVals, nil)
|
||||
|
||||
for idx, isAbsent := range tc.absentCommitSigs {
|
||||
@@ -212,10 +216,13 @@ func TestFinalizeBlockByzantineValidators(t *testing.T) {
|
||||
evpool.On("Update", ctx, mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return()
|
||||
evpool.On("CheckEvidence", ctx, mock.AnythingOfType("types.EvidenceList")).Return(nil)
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp,
|
||||
mmock.Mempool{}, evpool, blockStore)
|
||||
mmock.Mempool{}, evpool, blockStore, eventBus)
|
||||
|
||||
block, err := sf.MakeBlock(state, 1, new(types.Commit))
|
||||
require.NoError(t, err)
|
||||
@@ -250,6 +257,9 @@ func TestProcessProposal(t *testing.T) {
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
@@ -257,6 +267,7 @@ func TestProcessProposal(t *testing.T) {
|
||||
mmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
eventBus,
|
||||
)
|
||||
|
||||
block0, err := sf.MakeBlock(state, height-1, new(types.Commit))
|
||||
@@ -453,6 +464,9 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) {
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
@@ -460,15 +474,9 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) {
|
||||
mmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
eventBus,
|
||||
)
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
err = eventBus.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
defer eventBus.Stop()
|
||||
|
||||
blockExec.SetEventBus(eventBus)
|
||||
|
||||
updatesSub, err := eventBus.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
|
||||
ClientID: "TestFinalizeBlockValidatorUpdates",
|
||||
Query: types.EventQueryValidatorSetUpdates,
|
||||
@@ -524,6 +532,9 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
state, stateDB, _ := makeState(t, 1, 1)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
@@ -534,6 +545,7 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
|
||||
mmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
eventBus,
|
||||
)
|
||||
|
||||
block, err := sf.MakeBlock(state, 1, new(types.Commit))
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/crypto/tmhash"
|
||||
"github.com/tendermint/tendermint/internal/eventbus"
|
||||
memmock "github.com/tendermint/tendermint/internal/mempool/mock"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
@@ -36,6 +37,9 @@ func TestValidateBlockHeader(t *testing.T) {
|
||||
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx))
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
state, stateDB, privVals := makeState(t, 3, 1)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
@@ -46,6 +50,7 @@ func TestValidateBlockHeader(t *testing.T) {
|
||||
memmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
eventBus,
|
||||
)
|
||||
lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil)
|
||||
|
||||
@@ -125,6 +130,9 @@ func TestValidateBlockCommit(t *testing.T) {
|
||||
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx))
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
state, stateDB, privVals := makeState(t, 1, 1)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
@@ -135,6 +143,7 @@ func TestValidateBlockCommit(t *testing.T) {
|
||||
memmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
eventBus,
|
||||
)
|
||||
lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil)
|
||||
wrongSigsCommit := types.NewCommit(1, 0, types.BlockID{}, nil)
|
||||
@@ -263,6 +272,9 @@ func TestValidateBlockEvidence(t *testing.T) {
|
||||
evpool.On("ABCIEvidence", mock.AnythingOfType("int64"), mock.AnythingOfType("[]types.Evidence")).Return(
|
||||
[]abci.Evidence{})
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
state.ConsensusParams.Evidence.MaxBytes = 1000
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
@@ -271,6 +283,7 @@ func TestValidateBlockEvidence(t *testing.T) {
|
||||
memmock.Mempool{},
|
||||
evpool,
|
||||
blockStore,
|
||||
eventBus,
|
||||
)
|
||||
lastCommit := types.NewCommit(0, 0, types.BlockID{}, nil)
|
||||
|
||||
|
||||
@@ -289,6 +289,7 @@ func makeNode(
|
||||
mp,
|
||||
evPool,
|
||||
blockStore,
|
||||
eventBus,
|
||||
sm.BlockExecutorWithMetrics(nodeMetrics.state),
|
||||
)
|
||||
|
||||
|
||||
@@ -323,6 +323,8 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
@@ -330,6 +332,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
mp,
|
||||
evidencePool,
|
||||
blockStore,
|
||||
eventBus,
|
||||
)
|
||||
|
||||
commit := types.NewCommit(height-1, 0, types.BlockID{}, nil)
|
||||
@@ -398,6 +401,9 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
@@ -405,6 +411,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
mp,
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
eventBus,
|
||||
)
|
||||
|
||||
commit := types.NewCommit(height-1, 0, types.BlockID{}, nil)
|
||||
@@ -469,6 +476,9 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
eventBus := eventbus.NewDefault(logger)
|
||||
require.NoError(t, eventBus.Start(ctx))
|
||||
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
@@ -476,6 +486,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
mp,
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
eventBus,
|
||||
)
|
||||
|
||||
blockID := types.BlockID{
|
||||
|
||||
@@ -270,6 +270,7 @@ func createConsensusReactor(
|
||||
blockStore,
|
||||
mp,
|
||||
evidencePool,
|
||||
eventBus,
|
||||
consensus.StateMetrics(csMetrics),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -286,6 +287,7 @@ func createConsensusReactor(
|
||||
consensusState,
|
||||
router.OpenChannel,
|
||||
peerManager.Subscribe(ctx),
|
||||
eventBus,
|
||||
waitSync,
|
||||
csMetrics,
|
||||
)
|
||||
@@ -293,9 +295,6 @@ func createConsensusReactor(
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Services which will be publishing and/or subscribing for messages (events)
|
||||
// consensusReactor will set it on consensusState and blockExecutor.
|
||||
reactor.SetEventBus(eventBus)
|
||||
return reactor, consensusState, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user