mirror of
https://github.com/tendermint/tendermint.git
synced 2026-05-21 22:51:30 +00:00
Merge remote-tracking branch 'origin/master' into proxy-remove-triforcated-client
This commit is contained in:
@@ -70,6 +70,8 @@ type Reactor struct {
|
||||
|
||||
// immutable
|
||||
initialState sm.State
|
||||
// store
|
||||
stateStore sm.Store
|
||||
|
||||
blockExec *sm.BlockExecutor
|
||||
store *store.BlockStore
|
||||
@@ -101,7 +103,7 @@ type Reactor struct {
|
||||
func NewReactor(
|
||||
ctx context.Context,
|
||||
logger log.Logger,
|
||||
state sm.State,
|
||||
stateStore sm.Store,
|
||||
blockExec *sm.BlockExecutor,
|
||||
store *store.BlockStore,
|
||||
consReactor consensusReactor,
|
||||
@@ -111,19 +113,6 @@ func NewReactor(
|
||||
metrics *consensus.Metrics,
|
||||
eventBus *eventbus.EventBus,
|
||||
) (*Reactor, error) {
|
||||
|
||||
if state.LastBlockHeight != store.Height() {
|
||||
return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())
|
||||
}
|
||||
|
||||
startHeight := store.Height() + 1
|
||||
if startHeight == 1 {
|
||||
startHeight = state.InitialHeight
|
||||
}
|
||||
|
||||
requestsCh := make(chan BlockRequest, maxTotalRequesters)
|
||||
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
|
||||
|
||||
blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -131,20 +120,16 @@ func NewReactor(
|
||||
|
||||
r := &Reactor{
|
||||
logger: logger,
|
||||
initialState: state,
|
||||
stateStore: stateStore,
|
||||
blockExec: blockExec,
|
||||
store: store,
|
||||
pool: NewBlockPool(logger, startHeight, requestsCh, errorsCh),
|
||||
consReactor: consReactor,
|
||||
blockSync: newAtomicBool(blockSync),
|
||||
requestsCh: requestsCh,
|
||||
errorsCh: errorsCh,
|
||||
blockSyncCh: blockSyncCh,
|
||||
blockSyncOutBridgeCh: make(chan p2p.Envelope),
|
||||
peerUpdates: peerUpdates,
|
||||
metrics: metrics,
|
||||
eventBus: eventBus,
|
||||
syncStartTime: time.Time{},
|
||||
}
|
||||
|
||||
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
|
||||
@@ -159,6 +144,27 @@ func NewReactor(
|
||||
// If blockSync is enabled, we also start the pool and the pool processing
|
||||
// goroutine. If the pool fails to start, an error is returned.
|
||||
func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
state, err := r.stateStore.Load()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.initialState = state
|
||||
|
||||
if state.LastBlockHeight != r.store.Height() {
|
||||
return fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, r.store.Height())
|
||||
}
|
||||
|
||||
startHeight := r.store.Height() + 1
|
||||
if startHeight == 1 {
|
||||
startHeight = state.InitialHeight
|
||||
}
|
||||
|
||||
requestsCh := make(chan BlockRequest, maxTotalRequesters)
|
||||
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
|
||||
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh)
|
||||
r.requestsCh = requestsCh
|
||||
r.errorsCh = errorsCh
|
||||
|
||||
if r.blockSync.IsSet() {
|
||||
if err := r.pool.Start(ctx); err != nil {
|
||||
return err
|
||||
|
||||
@@ -176,7 +176,7 @@ func (rts *reactorTestSuite) addNode(
|
||||
rts.reactors[nodeID], err = NewReactor(
|
||||
ctx,
|
||||
rts.logger.With("nodeID", nodeID),
|
||||
state.Copy(),
|
||||
stateStore,
|
||||
blockExec,
|
||||
blockStore,
|
||||
nil,
|
||||
|
||||
@@ -82,7 +82,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
log.TestingLogger().With("module", "mempool"),
|
||||
thisConfig.Mempool,
|
||||
proxyAppConnMem,
|
||||
0,
|
||||
)
|
||||
if thisConfig.Consensus.WaitForTxs() {
|
||||
mempool.EnableTxsAvailable()
|
||||
@@ -95,7 +94,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
|
||||
// Make State
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
|
||||
cs := NewState(ctx, logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
|
||||
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
|
||||
require.NoError(t, err)
|
||||
// set private validator
|
||||
pv := privVals[i]
|
||||
cs.SetPrivValidator(ctx, pv)
|
||||
@@ -105,14 +105,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
cs.SetEventBus(eventBus)
|
||||
evpool.SetEventBus(eventBus)
|
||||
|
||||
cs.SetTimeoutTicker(tickerFunc())
|
||||
|
||||
states[i] = cs
|
||||
}()
|
||||
}
|
||||
|
||||
rts := setup(ctx, t, nValidators, states, 100) // buffer must be large enough to not deadlock
|
||||
rts := setup(ctx, t, nValidators, states, 512) // buffer must be large enough to not deadlock
|
||||
|
||||
var bzNodeID types.NodeID
|
||||
|
||||
@@ -238,8 +237,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, reactor := range rts.reactors {
|
||||
state := reactor.state.GetState()
|
||||
reactor.SwitchToConsensus(ctx, state, false)
|
||||
reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false)
|
||||
}
|
||||
|
||||
// Evidence should be submitted and committed at the third height but
|
||||
@@ -248,20 +246,26 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
i := 0
|
||||
subctx, subcancel := context.WithCancel(ctx)
|
||||
defer subcancel()
|
||||
for _, sub := range rts.subs {
|
||||
wg.Add(1)
|
||||
|
||||
go func(j int, s eventbus.Subscription) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
if subctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := s.Next(subctx)
|
||||
if subctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := s.Next(ctx)
|
||||
assert.NoError(t, err)
|
||||
if err != nil {
|
||||
cancel()
|
||||
t.Errorf("waiting for subscription: %v", err)
|
||||
subcancel()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -273,12 +277,18 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}(i, sub)
|
||||
|
||||
i++
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// don't run more assertions if we've encountered a timeout
|
||||
select {
|
||||
case <-subctx.Done():
|
||||
t.Fatal("encountered timeout")
|
||||
default:
|
||||
}
|
||||
|
||||
pubkey, err := bzNodeState.privValidator.GetPubKey(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -469,7 +469,6 @@ func newStateWithConfigAndBlockStore(
|
||||
logger.With("module", "mempool"),
|
||||
thisConfig.Mempool,
|
||||
proxyAppConnMem,
|
||||
0,
|
||||
)
|
||||
|
||||
if thisConfig.Consensus.WaitForTxs() {
|
||||
@@ -484,15 +483,19 @@ func newStateWithConfigAndBlockStore(
|
||||
require.NoError(t, stateStore.Save(state))
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore)
|
||||
cs := NewState(ctx,
|
||||
cs, err := NewState(ctx,
|
||||
logger.With("module", "consensus"),
|
||||
thisConfig.Consensus,
|
||||
state,
|
||||
stateStore,
|
||||
blockExec,
|
||||
blockStore,
|
||||
mempool,
|
||||
evpool,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cs.SetPrivValidator(ctx, pv)
|
||||
|
||||
eventBus := eventbus.NewDefault(logger.With("module", "events"))
|
||||
|
||||
@@ -461,6 +461,7 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
state, err := sm.MakeGenesisState(genDoc)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stateStore.Save(state))
|
||||
thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i))
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -483,7 +484,6 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
log.TestingLogger().With("module", "mempool"),
|
||||
thisConfig.Mempool,
|
||||
proxyAppConnMem,
|
||||
0,
|
||||
)
|
||||
|
||||
if thisConfig.Consensus.WaitForTxs() {
|
||||
@@ -506,8 +506,9 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
|
||||
|
||||
cs := NewState(ctx, logger.With("validator", i, "module", "consensus"),
|
||||
thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
|
||||
cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"),
|
||||
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2)
|
||||
require.NoError(t, err)
|
||||
cs.SetPrivValidator(ctx, pv)
|
||||
|
||||
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
|
||||
|
||||
@@ -84,7 +84,7 @@ func (cs *State) ReplayFile(ctx context.Context, file string, console bool) erro
|
||||
return err
|
||||
}
|
||||
|
||||
pb := newPlayback(file, fp, cs, cs.state.Copy())
|
||||
pb := newPlayback(file, fp, cs, cs.stateStore)
|
||||
defer pb.fp.Close()
|
||||
|
||||
var nextN int // apply N msgs in a row
|
||||
@@ -126,17 +126,17 @@ type playback struct {
|
||||
count int // how many lines/msgs into the file are we
|
||||
|
||||
// replays can be reset to beginning
|
||||
fileName string // so we can close/reopen the file
|
||||
genesisState sm.State // so the replay session knows where to restart from
|
||||
fileName string // so we can close/reopen the file
|
||||
stateStore sm.Store
|
||||
}
|
||||
|
||||
func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *playback {
|
||||
func newPlayback(fileName string, fp *os.File, cs *State, store sm.Store) *playback {
|
||||
return &playback{
|
||||
cs: cs,
|
||||
fp: fp,
|
||||
fileName: fileName,
|
||||
genesisState: genState,
|
||||
dec: NewWALDecoder(fp),
|
||||
cs: cs,
|
||||
fp: fp,
|
||||
fileName: fileName,
|
||||
stateStore: store,
|
||||
dec: NewWALDecoder(fp),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,8 +145,11 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event
|
||||
pb.cs.Stop()
|
||||
pb.cs.Wait()
|
||||
|
||||
newCS := NewState(ctx, pb.cs.logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
|
||||
newCS, err := NewState(ctx, pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec,
|
||||
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newCS.SetEventBus(pb.cs.eventBus)
|
||||
newCS.startForReplay()
|
||||
|
||||
@@ -345,9 +348,11 @@ func newConsensusStateForReplay(
|
||||
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore)
|
||||
|
||||
consensusState := NewState(ctx, logger, csConfig, state.Copy(), blockExec,
|
||||
consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec,
|
||||
blockStore, mempool, evpool)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
consensusState.SetEventBus(eventBus)
|
||||
return consensusState, nil
|
||||
}
|
||||
|
||||
@@ -121,6 +121,9 @@ type State struct {
|
||||
// store blocks and commits
|
||||
blockStore sm.BlockStore
|
||||
|
||||
stateStore sm.Store
|
||||
initialStatePopulated bool
|
||||
|
||||
// create and execute blocks
|
||||
blockExec *sm.BlockExecutor
|
||||
|
||||
@@ -189,18 +192,19 @@ func NewState(
|
||||
ctx context.Context,
|
||||
logger log.Logger,
|
||||
cfg *config.ConsensusConfig,
|
||||
state sm.State,
|
||||
store sm.Store,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore sm.BlockStore,
|
||||
txNotifier txNotifier,
|
||||
evpool evidencePool,
|
||||
options ...StateOption,
|
||||
) *State {
|
||||
) (*State, error) {
|
||||
cs := &State{
|
||||
logger: logger,
|
||||
config: cfg,
|
||||
blockExec: blockExec,
|
||||
blockStore: blockStore,
|
||||
stateStore: store,
|
||||
txNotifier: txNotifier,
|
||||
peerMsgQueue: make(chan msgInfo, msgQueueSize),
|
||||
internalMsgQueue: make(chan msgInfo, msgQueueSize),
|
||||
@@ -220,6 +224,31 @@ func NewState(
|
||||
cs.doPrevote = cs.defaultDoPrevote
|
||||
cs.setProposal = cs.defaultSetProposal
|
||||
|
||||
if err := cs.updateStateFromStore(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
|
||||
cs.BaseService = *service.NewBaseService(logger, "State", cs)
|
||||
for _, option := range options {
|
||||
option(cs)
|
||||
}
|
||||
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
func (cs *State) updateStateFromStore(ctx context.Context) error {
|
||||
if cs.initialStatePopulated {
|
||||
return nil
|
||||
}
|
||||
state, err := cs.stateStore.Load()
|
||||
if err != nil {
|
||||
return fmt.Errorf("loading state: %w", err)
|
||||
}
|
||||
if state.IsEmpty() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// We have no votes, so reconstruct LastCommit from SeenCommit.
|
||||
if state.LastBlockHeight > 0 {
|
||||
cs.reconstructLastCommit(state)
|
||||
@@ -227,14 +256,8 @@ func NewState(
|
||||
|
||||
cs.updateToState(ctx, state)
|
||||
|
||||
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
|
||||
|
||||
cs.BaseService = *service.NewBaseService(logger, "State", cs)
|
||||
for _, option := range options {
|
||||
option(cs)
|
||||
}
|
||||
|
||||
return cs
|
||||
cs.initialStatePopulated = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetEventBus sets event bus.
|
||||
@@ -365,6 +388,10 @@ func (cs *State) LoadCommit(height int64) *types.Commit {
|
||||
// OnStart loads the latest state via the WAL, and starts the timeout and
|
||||
// receive routines.
|
||||
func (cs *State) OnStart(ctx context.Context) error {
|
||||
if err := cs.updateStateFromStore(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We may set the WAL in testing before calling Start, so only OpenWAL if its
|
||||
// still the nilWAL.
|
||||
if _, ok := cs.wal.(nilWAL); ok {
|
||||
|
||||
@@ -83,7 +83,11 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
|
||||
mempool := emptyMempool{}
|
||||
evpool := sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
|
||||
consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
|
||||
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
consensusState.SetEventBus(eventBus)
|
||||
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
|
||||
consensusState.SetPrivValidator(ctx, privValidator)
|
||||
|
||||
@@ -94,7 +94,6 @@ func NewTxMempool(
|
||||
logger log.Logger,
|
||||
cfg *config.MempoolConfig,
|
||||
proxyAppConn abciclient.Client,
|
||||
height int64,
|
||||
options ...TxMempoolOption,
|
||||
) *TxMempool {
|
||||
|
||||
@@ -102,7 +101,7 @@ func NewTxMempool(
|
||||
logger: logger,
|
||||
config: cfg,
|
||||
proxyAppConn: proxyAppConn,
|
||||
height: height,
|
||||
height: -1,
|
||||
cache: NopTxCache{},
|
||||
metrics: NopMetrics(),
|
||||
txStore: NewTxStore(),
|
||||
|
||||
@@ -95,7 +95,7 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo
|
||||
appConnMem.Wait()
|
||||
})
|
||||
|
||||
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
|
||||
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, options...)
|
||||
}
|
||||
|
||||
func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
|
||||
|
||||
@@ -360,8 +360,8 @@ func (blockExec *BlockExecutor) Commit(
|
||||
block.Height,
|
||||
block.Txs,
|
||||
deliverTxResponses,
|
||||
TxPreCheck(state),
|
||||
TxPostCheck(state),
|
||||
TxPreCheckForState(state),
|
||||
TxPostCheckForState(state),
|
||||
)
|
||||
|
||||
return res.Data, res.RetainHeight, err
|
||||
|
||||
@@ -1,22 +1,85 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// TxPreCheck returns a function to filter transactions before processing.
|
||||
// The function limits the size of a transaction to the block's maximum data size.
|
||||
func TxPreCheck(state State) mempool.PreCheckFunc {
|
||||
maxDataBytes := types.MaxDataBytesNoEvidence(
|
||||
state.ConsensusParams.Block.MaxBytes,
|
||||
state.Validators.Size(),
|
||||
func cachingStateFetcher(store Store) func() (State, error) {
|
||||
const ttl = time.Second
|
||||
|
||||
var (
|
||||
last time.Time
|
||||
mutex = &sync.Mutex{}
|
||||
cache State
|
||||
err error
|
||||
)
|
||||
return mempool.PreCheckMaxBytes(maxDataBytes)
|
||||
|
||||
return func() (State, error) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
||||
if time.Since(last) < ttl && cache.ChainID != "" {
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
cache, err = store.Load()
|
||||
if err != nil {
|
||||
return State{}, err
|
||||
}
|
||||
last = time.Now()
|
||||
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TxPostCheck returns a function to filter transactions after processing.
|
||||
// The function limits the gas wanted by a transaction to the block's maximum total gas.
|
||||
func TxPostCheck(state State) mempool.PostCheckFunc {
|
||||
return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)
|
||||
// TxPreCheckFromStore returns a function to filter transactions before processing.
|
||||
// The function limits the size of a transaction to the block's maximum data size.
|
||||
func TxPreCheckFromStore(store Store) mempool.PreCheckFunc {
|
||||
fetch := cachingStateFetcher(store)
|
||||
|
||||
return func(tx types.Tx) error {
|
||||
state, err := fetch()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return TxPreCheckForState(state)(tx)
|
||||
}
|
||||
}
|
||||
|
||||
func TxPreCheckForState(state State) mempool.PreCheckFunc {
|
||||
return func(tx types.Tx) error {
|
||||
maxDataBytes := types.MaxDataBytesNoEvidence(
|
||||
state.ConsensusParams.Block.MaxBytes,
|
||||
state.Validators.Size(),
|
||||
)
|
||||
return mempool.PreCheckMaxBytes(maxDataBytes)(tx)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TxPostCheckFromStore returns a function to filter transactions after processing.
|
||||
// The function limits the gas wanted by a transaction to the block's maximum total gas.
|
||||
func TxPostCheckFromStore(store Store) mempool.PostCheckFunc {
|
||||
fetch := cachingStateFetcher(store)
|
||||
|
||||
return func(tx types.Tx, resp *abci.ResponseCheckTx) error {
|
||||
state, err := fetch()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TxPostCheckForState(state State) mempool.PostCheckFunc {
|
||||
return func(tx types.Tx, resp *abci.ResponseCheckTx) error {
|
||||
return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func TestTxFilter(t *testing.T) {
|
||||
state, err := sm.MakeGenesisState(genDoc)
|
||||
require.NoError(t, err)
|
||||
|
||||
f := sm.TxPreCheck(state)
|
||||
f := sm.TxPreCheckForState(state)
|
||||
if tc.isErr {
|
||||
assert.NotNil(t, f(tc.tx), "#%v", i)
|
||||
} else {
|
||||
|
||||
34
node/node.go
34
node/node.go
@@ -143,11 +143,8 @@ func makeNode(
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
err = genDoc.ValidateAndComplete()
|
||||
if err != nil {
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("error in genesis doc: %w", err),
|
||||
makeCloser(closers))
|
||||
if err = genDoc.ValidateAndComplete(); err != nil {
|
||||
return nil, combineCloseError(fmt.Errorf("error in genesis doc: %w", err), makeCloser(closers))
|
||||
}
|
||||
|
||||
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
|
||||
@@ -241,10 +238,6 @@ func makeNode(
|
||||
}
|
||||
}
|
||||
|
||||
// Determine whether we should do block sync. This must happen after the handshake, since the
|
||||
// app may modify the validator set, specifying ourself as the only validator.
|
||||
blockSync := !onlyValidatorIsUs(state, pubKey)
|
||||
|
||||
logNodeStartupInfo(state, pubKey, logger, cfg.Mode)
|
||||
|
||||
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
|
||||
@@ -271,14 +264,14 @@ func makeNode(
|
||||
}
|
||||
|
||||
mpReactor, mp, err := createMempoolReactor(ctx,
|
||||
cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
|
||||
cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
evReactor, evPool, err := createEvidenceReactor(ctx,
|
||||
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
|
||||
cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
@@ -295,8 +288,12 @@ func makeNode(
|
||||
sm.BlockExecutorWithMetrics(nodeMetrics.state),
|
||||
)
|
||||
|
||||
// Determine whether we should do block sync. This must happen after the handshake, since the
|
||||
// app may modify the validator set, specifying ourself as the only validator.
|
||||
blockSync := !onlyValidatorIsUs(state, pubKey)
|
||||
|
||||
csReactor, csState, err := createConsensusReactor(ctx,
|
||||
cfg, state, blockExec, blockStore, mp, evPool,
|
||||
cfg, stateStore, blockExec, blockStore, mp, evPool,
|
||||
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
|
||||
peerManager, router, logger,
|
||||
)
|
||||
@@ -308,7 +305,7 @@ func makeNode(
|
||||
// doing a state sync first.
|
||||
bcReactor, err := blocksync.NewReactor(ctx,
|
||||
logger.With("module", "blockchain"),
|
||||
state.Copy(),
|
||||
stateStore,
|
||||
blockExec,
|
||||
blockStore,
|
||||
csReactor,
|
||||
@@ -728,10 +725,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
|
||||
// loadStateFromDBOrGenesisDocProvider attempts to load the state from the
|
||||
// database, or creates one using the given genesisDocProvider. On success this also
|
||||
// returns the genesis doc loaded through the given provider.
|
||||
func loadStateFromDBOrGenesisDocProvider(
|
||||
stateStore sm.Store,
|
||||
genDoc *types.GenesisDoc,
|
||||
) (sm.State, error) {
|
||||
func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.GenesisDoc) (sm.State, error) {
|
||||
|
||||
// 1. Attempt to load state form the database
|
||||
state, err := stateStore.Load()
|
||||
@@ -745,6 +739,12 @@ func loadStateFromDBOrGenesisDocProvider(
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
|
||||
// 3. save the gensis document to the state store so
|
||||
// its fetchable by other callers.
|
||||
if err := stateStore.Save(state); err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return state, nil
|
||||
|
||||
@@ -292,7 +292,6 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp,
|
||||
state.LastBlockHeight,
|
||||
)
|
||||
|
||||
// Make EvidencePool
|
||||
@@ -392,7 +391,6 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp,
|
||||
state.LastBlockHeight,
|
||||
)
|
||||
|
||||
// fill the mempool with one txs just below the maximum size
|
||||
@@ -457,7 +455,6 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp,
|
||||
state.LastBlockHeight,
|
||||
)
|
||||
|
||||
// fill the mempool with one txs just below the maximum size
|
||||
|
||||
@@ -172,7 +172,7 @@ func createMempoolReactor(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
proxyApp abciclient.Client,
|
||||
state sm.State,
|
||||
store sm.Store,
|
||||
memplMetrics *mempool.Metrics,
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
@@ -184,10 +184,9 @@ func createMempoolReactor(
|
||||
logger,
|
||||
cfg.Mempool,
|
||||
proxyApp,
|
||||
state.LastBlockHeight,
|
||||
mempool.WithMetrics(memplMetrics),
|
||||
mempool.WithPreCheck(sm.TxPreCheck(state)),
|
||||
mempool.WithPostCheck(sm.TxPostCheck(state)),
|
||||
mempool.WithPreCheck(sm.TxPreCheckFromStore(store)),
|
||||
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
|
||||
)
|
||||
|
||||
reactor, err := mempool.NewReactor(
|
||||
@@ -214,7 +213,7 @@ func createEvidenceReactor(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
stateDB dbm.DB,
|
||||
store sm.Store,
|
||||
blockStore *store.BlockStore,
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
@@ -229,7 +228,7 @@ func createEvidenceReactor(
|
||||
|
||||
logger = logger.With("module", "evidence")
|
||||
|
||||
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore, metrics)
|
||||
evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
|
||||
}
|
||||
@@ -253,7 +252,7 @@ func createEvidenceReactor(
|
||||
func createConsensusReactor(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
state sm.State,
|
||||
store sm.Store,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore sm.BlockStore,
|
||||
mp mempool.Mempool,
|
||||
@@ -268,16 +267,19 @@ func createConsensusReactor(
|
||||
) (*consensus.Reactor, *consensus.State, error) {
|
||||
logger = logger.With("module", "consensus")
|
||||
|
||||
consensusState := consensus.NewState(ctx,
|
||||
consensusState, err := consensus.NewState(ctx,
|
||||
logger,
|
||||
cfg.Consensus,
|
||||
state.Copy(),
|
||||
store,
|
||||
blockExec,
|
||||
blockStore,
|
||||
mp,
|
||||
evidencePool,
|
||||
consensus.StateMetrics(csMetrics),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if privValidator != nil && cfg.Mode == config.ModeValidator {
|
||||
consensusState.SetPrivValidator(ctx, privValidator)
|
||||
|
||||
@@ -31,7 +31,6 @@ func init() {
|
||||
log.NewNopLogger(),
|
||||
cfg,
|
||||
appConnMem,
|
||||
0,
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user