From 62fab24cdb6b3451d35d1082c86730aa7edd025a Mon Sep 17 00:00:00 2001 From: William Banfield Date: Fri, 25 Mar 2022 13:05:45 -0400 Subject: [PATCH] add mechanism to change the number of bytes sent to the application --- config/config.go | 43 ++++++---- config/toml.go | 12 +++ internal/consensus/byzantine_test.go | 2 +- internal/consensus/state.go | 2 +- internal/state/execution.go | 18 ++++- internal/state/execution_test.go | 117 +++++++++++++++++++++++++-- internal/state/tx_filter.go | 14 ++-- node/node_test.go | 13 ++- types/block.go | 22 +++-- types/block_test.go | 30 +++---- 10 files changed, 205 insertions(+), 68 deletions(-) diff --git a/config/config.go b/config/config.go index e33b8dd7e..cf7117178 100644 --- a/config/config.go +++ b/config/config.go @@ -981,6 +981,17 @@ type ConsensusConfig struct { CreateEmptyBlocks bool `mapstructure:"create-empty-blocks"` CreateEmptyBlocksInterval time.Duration `mapstructure:"create-empty-blocks-interval"` + // ExperimentalPrepareProposalTxBytes determines how many bytes will be sent to the application + // duration the PreparePropsal call. + // + // If this field is set to -1 then entire contents of the mempool will be sent + // to the application during PrepareProposal. + // If the value of ExperimentalPrepareProposalTxBytes is less than the block transaction data size + // determined using the ConsensusParams.Block.MaxBytes value, then the number and + // size of the transactions sent to the application during PrepareProposal + // will be instead determined using the value of ConsensusParams.Block.MaxBytes. + ExperimentalPrepareProposalTxBytes int64 `mapstructure:"experimental-prepare-proposal-tx-bytes"` + // Reactor sleep duration parameters PeerGossipSleepDuration time.Duration `mapstructure:"peer-gossip-sleep-duration"` PeerQueryMaj23SleepDuration time.Duration `mapstructure:"peer-query-maj23-sleep-duration"` @@ -991,20 +1002,21 @@ type ConsensusConfig struct { // DefaultConsensusConfig returns a default configuration for the consensus service func DefaultConsensusConfig() *ConsensusConfig { return &ConsensusConfig{ - WalPath: filepath.Join(defaultDataDir, "cs.wal", "wal"), - TimeoutPropose: 3000 * time.Millisecond, - TimeoutProposeDelta: 500 * time.Millisecond, - TimeoutPrevote: 1000 * time.Millisecond, - TimeoutPrevoteDelta: 500 * time.Millisecond, - TimeoutPrecommit: 1000 * time.Millisecond, - TimeoutPrecommitDelta: 500 * time.Millisecond, - TimeoutCommit: 1000 * time.Millisecond, - SkipTimeoutCommit: false, - CreateEmptyBlocks: true, - CreateEmptyBlocksInterval: 0 * time.Second, - PeerGossipSleepDuration: 100 * time.Millisecond, - PeerQueryMaj23SleepDuration: 2000 * time.Millisecond, - DoubleSignCheckHeight: int64(0), + WalPath: filepath.Join(defaultDataDir, "cs.wal", "wal"), + TimeoutPropose: 3000 * time.Millisecond, + TimeoutProposeDelta: 500 * time.Millisecond, + TimeoutPrevote: 1000 * time.Millisecond, + TimeoutPrevoteDelta: 500 * time.Millisecond, + TimeoutPrecommit: 1000 * time.Millisecond, + TimeoutPrecommitDelta: 500 * time.Millisecond, + TimeoutCommit: 1000 * time.Millisecond, + SkipTimeoutCommit: false, + CreateEmptyBlocks: true, + CreateEmptyBlocksInterval: 0 * time.Second, + ExperimentalPrepareProposalTxBytes: 0, + PeerGossipSleepDuration: 100 * time.Millisecond, + PeerQueryMaj23SleepDuration: 2000 * time.Millisecond, + DoubleSignCheckHeight: int64(0), } } @@ -1070,6 +1082,9 @@ func (cfg *ConsensusConfig) ValidateBasic() error { if cfg.CreateEmptyBlocksInterval < 0 { return errors.New("create-empty-blocks-interval can't be negative") } + if cfg.ExperimentalPrepareProposalTxBytes < -1 { + return errors.New("prepare-proposal-tx-bytes must be greater than -1") + } if cfg.PeerGossipSleepDuration < 0 { return errors.New("peer-gossip-sleep-duration can't be negative") } diff --git a/config/toml.go b/config/toml.go index a82e7f59d..a6da6d32c 100644 --- a/config/toml.go +++ b/config/toml.go @@ -476,6 +476,18 @@ double-sign-check-height = {{ .Consensus.DoubleSignCheckHeight }} # Make progress as soon as we have all the precommits (as if TimeoutCommit = 0) skip-timeout-commit = {{ .Consensus.SkipTimeoutCommit }} +# ExperimentalPrepareProposalTxBytes is an experimental field. +# This field determines how many bytes will be sent to the application +# duration the PreparePropsal call. +# +# If this field is set to -1 then entire contents of the mempool will be sent +# to the application during PrepareProposal. +# If the value of ExperimentalPrepareProposalTxBytes is less than the block transaction data size +# determined using the ConsensusParams.Block.MaxBytes value, then the number and +# size of the transactions sent to the application during PrepareProposal +# will be instead determined using the value of ConsensusParams.Block.MaxBytes. +experimental-prepare-proposal-tx-bytes = {{ .Consensus.ExperimentalPrepareProposalTxBytes }} + # EmptyBlocks mode and possible interval between empty blocks create-empty-blocks = {{ .Consensus.CreateEmptyBlocks }} create-empty-blocks-interval = "{{ .Consensus.CreateEmptyBlocksInterval }}" diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 40a37b812..e67146889 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -201,7 +201,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { proposerAddr := lazyNodeState.privValidatorPubKey.Address() block, err := lazyNodeState.blockExec.CreateProposalBlock( - ctx, lazyNodeState.Height, lazyNodeState.state, commit, proposerAddr, nil) + ctx, lazyNodeState.Height, lazyNodeState.state, commit, proposerAddr, nil, 0) require.NoError(t, err) blockParts, err := block.MakePartSet(types.BlockPartSizeBytes) require.NoError(t, err) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index e27d971ca..d8a48729d 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1415,7 +1415,7 @@ func (cs *State) createProposalBlock(ctx context.Context) (*types.Block, error) proposerAddr := cs.privValidatorPubKey.Address() - return cs.blockExec.CreateProposalBlock(ctx, cs.Height, cs.state, commit, proposerAddr, cs.LastCommit.GetVotes()) + return cs.blockExec.CreateProposalBlock(ctx, cs.Height, cs.state, commit, proposerAddr, cs.LastCommit.GetVotes(), cs.config.PrepareProposalTxBytes) } // Enter: `timeoutPropose` after entering Propose. diff --git a/internal/state/execution.go b/internal/state/execution.go index 4f02092d5..1961c6b91 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -104,17 +104,27 @@ func (blockExec *BlockExecutor) CreateProposalBlock( commit *types.Commit, proposerAddr []byte, votes []*types.Vote, + appMaxBytes int64, ) (*types.Block, error) { - maxBytes := state.ConsensusParams.Block.MaxBytes maxGas := state.ConsensusParams.Block.MaxGas evidence, evSize := blockExec.evpool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) + maxDataBytes, err := types.MaxDataBytes(state.ConsensusParams.Block.MaxBytes, evSize, state.Validators.Size()) + if err != nil { + panic(err) + } - // Fetch a limited amount of valid txs - maxDataBytes := types.MaxDataBytes(maxBytes, evSize, state.Validators.Size()) + var txs types.Txs + switch { + case appMaxBytes == -1: + txs = blockExec.mempool.ReapMaxBytesMaxGas(-1, -1) + case appMaxBytes > maxDataBytes: + txs = blockExec.mempool.ReapMaxBytesMaxGas(appMaxBytes, -1) + default: + txs = blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas) + } - txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas) block := state.MakeBlock(height, txs, commit, evidence, proposerAddr) localLastCommit := buildLastCommitInfo(block, blockExec.store, state.InitialHeight) diff --git a/internal/state/execution_test.go b/internal/state/execution_test.go index 58580e7be..2cc488c6c 100644 --- a/internal/state/execution_test.go +++ b/internal/state/execution_test.go @@ -604,6 +604,111 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { assert.NotEmpty(t, state.NextValidators.Validators) } +func TestPrepareProposalAppMaxBytes(t *testing.T) { + for _, tc := range []struct { + name string + nodeMaxBytes int64 + conesensusMaxBytes int64 + conesensusMaxGas int64 + + expectedMaxBytesCalled int64 + expectedMaxGasCalled int64 + }{ + { + name: "Local PrepareProposalTxBytes set to 1000", + nodeMaxBytes: 5000000, + conesensusMaxGas: 100, + conesensusMaxBytes: 5000000, + + expectedMaxBytesCalled: 5000000, + expectedMaxGasCalled: -1, + }, + { + name: "Local PrepareProposalTxBytes set to -1", + nodeMaxBytes: -1, + conesensusMaxGas: 100, + conesensusMaxBytes: 10000000, + + expectedMaxBytesCalled: -1, + expectedMaxGasCalled: -1, + }, + { + name: "Local PrepareProposalTxBytes smaller than block data", + nodeMaxBytes: 50000, + conesensusMaxGas: 100, + conesensusMaxBytes: 10000000, + + expectedMaxBytesCalled: func(t *testing.T) int64 { + res, err := types.MaxDataBytes(10000000, 0, 1) + require.NoError(t, err) + return res + }(t), + expectedMaxGasCalled: 100, + }, + } { + t.Run(tc.name, func(t *testing.T) { + const height = 2 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logger := log.NewNopLogger() + + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + + app := abcimocks.NewBaseMock() + app.On("PrepareProposal", mock.Anything).Return(abci.ResponsePrepareProposal{ + ModifiedTxStatus: abci.ResponsePrepareProposal_MODIFIED, + }, nil) + cc := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) + err := proxyApp.Start(ctx) + require.NoError(t, err) + + state, stateDB, privVals := makeState(t, 1, height) + state.ConsensusParams.Block.MaxBytes = tc.conesensusMaxBytes + state.ConsensusParams.Block.MaxGas = tc.conesensusMaxGas + stateStore := sm.NewStore(stateDB) + mp := &mpmocks.Mempool{} + mp.On("Lock").Return() + mp.On("Unlock").Return() + mp.On("FlushAppConn", mock.Anything).Return(nil) + mp.On("Update", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything).Return(nil) + var txs types.Txs + + var mb int64 + if tc.nodeMaxBytes <= 0 { + mb = 1000 + } + for i := 0; i < int(mb); i++ { + txs = append(txs, []byte{byte(i)}) + } + + mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(txs) + + blockExec := sm.NewBlockExecutor( + stateStore, + logger, + proxyApp, + mp, + sm.EmptyEvidencePool{}, + nil, + eventBus, + ) + pa, _ := state.Validators.GetByIndex(0) + commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals) + _, err = blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, tc.nodeMaxBytes) + require.NoError(t, err) + mp.AssertCalled(t, "ReapMaxBytesMaxGas", tc.expectedMaxBytesCalled, tc.expectedMaxGasCalled) + }) + } +} func TestEmptyPrepareProposal(t *testing.T) { const height = 2 ctx, cancel := context.WithCancel(context.Background()) @@ -649,7 +754,7 @@ func TestEmptyPrepareProposal(t *testing.T) { ) pa, _ := state.Validators.GetByIndex(0) commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals) - _, err = blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil) + _, err = blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0) require.NoError(t, err) } @@ -705,7 +810,7 @@ func TestPrepareProposalPanicOnInvalid(t *testing.T) { commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals) require.Panics(t, func() { - blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil) //nolint:errcheck + blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0) //nolint:errcheck }) mp.AssertExpectations(t) @@ -760,7 +865,7 @@ func TestPrepareProposalRemoveTxs(t *testing.T) { ) pa, _ := state.Validators.GetByIndex(0) commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals) - block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil) + block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0) require.NoError(t, err) require.Len(t, block.Data.Txs.ToSliceOfBytes(), len(trs)-2) @@ -819,7 +924,7 @@ func TestPrepareProposalAddedTxsIncluded(t *testing.T) { ) pa, _ := state.Validators.GetByIndex(0) commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals) - block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil) + block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0) require.NoError(t, err) require.Equal(t, txs[0], block.Data.Txs[0]) @@ -875,7 +980,7 @@ func TestPrepareProposalReorderTxs(t *testing.T) { ) pa, _ := state.Validators.GetByIndex(0) commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals) - block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil) + block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0) require.NoError(t, err) for i, tx := range block.Data.Txs { require.Equal(t, types.Tx(trs[i].Tx), tx) @@ -938,7 +1043,7 @@ func TestPrepareProposalModifiedTxStatusFalse(t *testing.T) { ) pa, _ := state.Validators.GetByIndex(0) commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals) - block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil) + block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0) require.NoError(t, err) for i, tx := range block.Data.Txs { require.Equal(t, txs[i], tx) diff --git a/internal/state/tx_filter.go b/internal/state/tx_filter.go index 11dd9ce67..1795aeb60 100644 --- a/internal/state/tx_filter.go +++ b/internal/state/tx_filter.go @@ -54,14 +54,14 @@ func TxPreCheckFromStore(store Store) mempool.PreCheckFunc { } func TxPreCheckForState(state State) mempool.PreCheckFunc { - return func(tx types.Tx) error { - maxDataBytes := types.MaxDataBytesNoEvidence( - state.ConsensusParams.Block.MaxBytes, - state.Validators.Size(), - ) - return mempool.PreCheckMaxBytes(maxDataBytes)(tx) + maxDataBytes, err := types.MaxDataBytesNoEvidence( + state.ConsensusParams.Block.MaxBytes, + state.Validators.Size(), + ) + if err != nil { + panic(err) } - + return mempool.PreCheckMaxBytes(maxDataBytes) } // TxPostCheckFromStore returns a function to filter transactions after processing. diff --git a/node/node_test.go b/node/node_test.go index e70be984f..43173007a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -342,6 +342,7 @@ func TestCreateProposalBlock(t *testing.T) { state, commit, proposerAddr, nil, + 0, ) require.NoError(t, err) @@ -396,8 +397,9 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { ) // fill the mempool with one txs just below the maximum size - txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1)) - tx := tmrand.Bytes(txLength - 4) // to account for the varint + mb, err := types.MaxDataBytesNoEvidence(maxBytes, 1) + require.NoError(t, err) + tx := tmrand.Bytes(int(mb) - 4) // to account for the varint err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{}) assert.NoError(t, err) @@ -421,6 +423,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { state, commit, proposerAddr, nil, + 0, ) require.NoError(t, err) @@ -464,8 +467,9 @@ func TestMaxProposalBlockSize(t *testing.T) { ) // fill the mempool with one txs just below the maximum size - txLength := int(types.MaxDataBytesNoEvidence(maxBytes, types.MaxVotesCount)) - tx := tmrand.Bytes(txLength - 6) // to account for the varint + mb, err := types.MaxDataBytesNoEvidence(maxBytes, types.MaxVotesCount) + require.NoError(t, err) + tx := tmrand.Bytes(int(mb) - 6) // to account for the varint err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{}) assert.NoError(t, err) // now produce more txs than what a normal block can hold with 10 smaller txs @@ -542,6 +546,7 @@ func TestMaxProposalBlockSize(t *testing.T) { state, commit, proposerAddr, nil, + 0, ) require.NoError(t, err) partSet, err := block.MakePartSet(types.BlockPartSizeBytes) diff --git a/types/block.go b/types/block.go index d6e45af6a..df26cf81d 100644 --- a/types/block.go +++ b/types/block.go @@ -262,9 +262,8 @@ func BlockFromProto(bp *tmproto.Block) (*Block, error) { //----------------------------------------------------------------------------- // MaxDataBytes returns the maximum size of block's data. -// -// XXX: Panics on negative result. -func MaxDataBytes(maxBytes, evidenceBytes int64, valsCount int) int64 { +// Returns an error if the max data is negative. +func MaxDataBytes(maxBytes, evidenceBytes int64, valsCount int) (int64, error) { maxDataBytes := maxBytes - MaxOverheadForBlock - MaxHeaderBytes - @@ -272,36 +271,33 @@ func MaxDataBytes(maxBytes, evidenceBytes int64, valsCount int) int64 { evidenceBytes if maxDataBytes < 0 { - panic(fmt.Sprintf( + return maxDataBytes, fmt.Errorf( "Negative MaxDataBytes. Block.MaxBytes=%d is too small to accommodate header&lastCommit&evidence=%d", maxBytes, -(maxDataBytes - maxBytes), - )) + ) } - - return maxDataBytes + return maxDataBytes, nil } // MaxDataBytesNoEvidence returns the maximum size of block's data when // evidence count is unknown. MaxEvidencePerBlock will be used for the size // of evidence. -// -// XXX: Panics on negative result. -func MaxDataBytesNoEvidence(maxBytes int64, valsCount int) int64 { +func MaxDataBytesNoEvidence(maxBytes int64, valsCount int) (int64, error) { maxDataBytes := maxBytes - MaxOverheadForBlock - MaxHeaderBytes - MaxCommitBytes(valsCount) if maxDataBytes < 0 { - panic(fmt.Sprintf( + return maxDataBytes, fmt.Errorf( "Negative MaxDataBytesUnknownEvidence. Block.MaxBytes=%d is too small to accommodate header&lastCommit&evidence=%d", maxBytes, -(maxDataBytes - maxBytes), - )) + ) } - return maxDataBytes + return maxDataBytes, nil } // MakeBlock returns a new block with an empty header, except what can be diff --git a/types/block_test.go b/types/block_test.go index 4ed47dd9d..9bba09497 100644 --- a/types/block_test.go +++ b/types/block_test.go @@ -499,7 +499,7 @@ func TestBlockMaxDataBytes(t *testing.T) { maxBytes int64 valsCount int evidenceBytes int64 - panics bool + errors bool result int64 }{ 0: {-10, 1, 0, true, 0}, @@ -513,15 +513,12 @@ func TestBlockMaxDataBytes(t *testing.T) { for i, tc := range testCases { tc := tc - if tc.panics { - assert.Panics(t, func() { - MaxDataBytes(tc.maxBytes, tc.evidenceBytes, tc.valsCount) - }, "#%v", i) + b, err := MaxDataBytes(tc.maxBytes, tc.evidenceBytes, tc.valsCount) + if tc.errors { + require.Error(t, err, "#%v", i) } else { - assert.Equal(t, - tc.result, - MaxDataBytes(tc.maxBytes, tc.evidenceBytes, tc.valsCount), - "#%v", i) + require.NoError(t, err) + assert.Equal(t, tc.result, b, "#%v", i) } } } @@ -530,7 +527,7 @@ func TestBlockMaxDataBytesNoEvidence(t *testing.T) { testCases := []struct { maxBytes int64 valsCount int - panics bool + errors bool result int64 }{ 0: {-10, 1, true, 0}, @@ -542,15 +539,12 @@ func TestBlockMaxDataBytesNoEvidence(t *testing.T) { for i, tc := range testCases { tc := tc - if tc.panics { - assert.Panics(t, func() { - MaxDataBytesNoEvidence(tc.maxBytes, tc.valsCount) - }, "#%v", i) + b, err := MaxDataBytesNoEvidence(tc.maxBytes, tc.valsCount) + if tc.errors { + assert.Error(t, err, "#%v", i) } else { - assert.Equal(t, - tc.result, - MaxDataBytesNoEvidence(tc.maxBytes, tc.valsCount), - "#%v", i) + require.NoError(t, err) + assert.Equal(t, tc.result, b, "#%v", i) } } }