Compare commits

...

1 Commits

Author SHA1 Message Date
William Banfield
62fab24cdb add mechanism to change the number of bytes sent to the application 2022-03-28 12:09:10 -04:00
10 changed files with 205 additions and 68 deletions

View File

@@ -981,6 +981,17 @@ type ConsensusConfig struct {
CreateEmptyBlocks bool `mapstructure:"create-empty-blocks"`
CreateEmptyBlocksInterval time.Duration `mapstructure:"create-empty-blocks-interval"`
// ExperimentalPrepareProposalTxBytes determines how many bytes will be sent to the application
// duration the PreparePropsal call.
//
// If this field is set to -1 then entire contents of the mempool will be sent
// to the application during PrepareProposal.
// If the value of ExperimentalPrepareProposalTxBytes is less than the block transaction data size
// determined using the ConsensusParams.Block.MaxBytes value, then the number and
// size of the transactions sent to the application during PrepareProposal
// will be instead determined using the value of ConsensusParams.Block.MaxBytes.
ExperimentalPrepareProposalTxBytes int64 `mapstructure:"experimental-prepare-proposal-tx-bytes"`
// Reactor sleep duration parameters
PeerGossipSleepDuration time.Duration `mapstructure:"peer-gossip-sleep-duration"`
PeerQueryMaj23SleepDuration time.Duration `mapstructure:"peer-query-maj23-sleep-duration"`
@@ -991,20 +1002,21 @@ type ConsensusConfig struct {
// DefaultConsensusConfig returns a default configuration for the consensus service
func DefaultConsensusConfig() *ConsensusConfig {
return &ConsensusConfig{
WalPath: filepath.Join(defaultDataDir, "cs.wal", "wal"),
TimeoutPropose: 3000 * time.Millisecond,
TimeoutProposeDelta: 500 * time.Millisecond,
TimeoutPrevote: 1000 * time.Millisecond,
TimeoutPrevoteDelta: 500 * time.Millisecond,
TimeoutPrecommit: 1000 * time.Millisecond,
TimeoutPrecommitDelta: 500 * time.Millisecond,
TimeoutCommit: 1000 * time.Millisecond,
SkipTimeoutCommit: false,
CreateEmptyBlocks: true,
CreateEmptyBlocksInterval: 0 * time.Second,
PeerGossipSleepDuration: 100 * time.Millisecond,
PeerQueryMaj23SleepDuration: 2000 * time.Millisecond,
DoubleSignCheckHeight: int64(0),
WalPath: filepath.Join(defaultDataDir, "cs.wal", "wal"),
TimeoutPropose: 3000 * time.Millisecond,
TimeoutProposeDelta: 500 * time.Millisecond,
TimeoutPrevote: 1000 * time.Millisecond,
TimeoutPrevoteDelta: 500 * time.Millisecond,
TimeoutPrecommit: 1000 * time.Millisecond,
TimeoutPrecommitDelta: 500 * time.Millisecond,
TimeoutCommit: 1000 * time.Millisecond,
SkipTimeoutCommit: false,
CreateEmptyBlocks: true,
CreateEmptyBlocksInterval: 0 * time.Second,
ExperimentalPrepareProposalTxBytes: 0,
PeerGossipSleepDuration: 100 * time.Millisecond,
PeerQueryMaj23SleepDuration: 2000 * time.Millisecond,
DoubleSignCheckHeight: int64(0),
}
}
@@ -1070,6 +1082,9 @@ func (cfg *ConsensusConfig) ValidateBasic() error {
if cfg.CreateEmptyBlocksInterval < 0 {
return errors.New("create-empty-blocks-interval can't be negative")
}
if cfg.ExperimentalPrepareProposalTxBytes < -1 {
return errors.New("prepare-proposal-tx-bytes must be greater than -1")
}
if cfg.PeerGossipSleepDuration < 0 {
return errors.New("peer-gossip-sleep-duration can't be negative")
}

View File

@@ -476,6 +476,18 @@ double-sign-check-height = {{ .Consensus.DoubleSignCheckHeight }}
# Make progress as soon as we have all the precommits (as if TimeoutCommit = 0)
skip-timeout-commit = {{ .Consensus.SkipTimeoutCommit }}
# ExperimentalPrepareProposalTxBytes is an experimental field.
# This field determines how many bytes will be sent to the application
# duration the PreparePropsal call.
#
# If this field is set to -1 then entire contents of the mempool will be sent
# to the application during PrepareProposal.
# If the value of ExperimentalPrepareProposalTxBytes is less than the block transaction data size
# determined using the ConsensusParams.Block.MaxBytes value, then the number and
# size of the transactions sent to the application during PrepareProposal
# will be instead determined using the value of ConsensusParams.Block.MaxBytes.
experimental-prepare-proposal-tx-bytes = {{ .Consensus.ExperimentalPrepareProposalTxBytes }}
# EmptyBlocks mode and possible interval between empty blocks
create-empty-blocks = {{ .Consensus.CreateEmptyBlocks }}
create-empty-blocks-interval = "{{ .Consensus.CreateEmptyBlocksInterval }}"

View File

@@ -201,7 +201,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
proposerAddr := lazyNodeState.privValidatorPubKey.Address()
block, err := lazyNodeState.blockExec.CreateProposalBlock(
ctx, lazyNodeState.Height, lazyNodeState.state, commit, proposerAddr, nil)
ctx, lazyNodeState.Height, lazyNodeState.state, commit, proposerAddr, nil, 0)
require.NoError(t, err)
blockParts, err := block.MakePartSet(types.BlockPartSizeBytes)
require.NoError(t, err)

View File

@@ -1415,7 +1415,7 @@ func (cs *State) createProposalBlock(ctx context.Context) (*types.Block, error)
proposerAddr := cs.privValidatorPubKey.Address()
return cs.blockExec.CreateProposalBlock(ctx, cs.Height, cs.state, commit, proposerAddr, cs.LastCommit.GetVotes())
return cs.blockExec.CreateProposalBlock(ctx, cs.Height, cs.state, commit, proposerAddr, cs.LastCommit.GetVotes(), cs.config.PrepareProposalTxBytes)
}
// Enter: `timeoutPropose` after entering Propose.

View File

@@ -104,17 +104,27 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
commit *types.Commit,
proposerAddr []byte,
votes []*types.Vote,
appMaxBytes int64,
) (*types.Block, error) {
maxBytes := state.ConsensusParams.Block.MaxBytes
maxGas := state.ConsensusParams.Block.MaxGas
evidence, evSize := blockExec.evpool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes)
maxDataBytes, err := types.MaxDataBytes(state.ConsensusParams.Block.MaxBytes, evSize, state.Validators.Size())
if err != nil {
panic(err)
}
// Fetch a limited amount of valid txs
maxDataBytes := types.MaxDataBytes(maxBytes, evSize, state.Validators.Size())
var txs types.Txs
switch {
case appMaxBytes == -1:
txs = blockExec.mempool.ReapMaxBytesMaxGas(-1, -1)
case appMaxBytes > maxDataBytes:
txs = blockExec.mempool.ReapMaxBytesMaxGas(appMaxBytes, -1)
default:
txs = blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
}
txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
block := state.MakeBlock(height, txs, commit, evidence, proposerAddr)
localLastCommit := buildLastCommitInfo(block, blockExec.store, state.InitialHeight)

View File

@@ -604,6 +604,111 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
assert.NotEmpty(t, state.NextValidators.Validators)
}
func TestPrepareProposalAppMaxBytes(t *testing.T) {
for _, tc := range []struct {
name string
nodeMaxBytes int64
conesensusMaxBytes int64
conesensusMaxGas int64
expectedMaxBytesCalled int64
expectedMaxGasCalled int64
}{
{
name: "Local PrepareProposalTxBytes set to 1000",
nodeMaxBytes: 5000000,
conesensusMaxGas: 100,
conesensusMaxBytes: 5000000,
expectedMaxBytesCalled: 5000000,
expectedMaxGasCalled: -1,
},
{
name: "Local PrepareProposalTxBytes set to -1",
nodeMaxBytes: -1,
conesensusMaxGas: 100,
conesensusMaxBytes: 10000000,
expectedMaxBytesCalled: -1,
expectedMaxGasCalled: -1,
},
{
name: "Local PrepareProposalTxBytes smaller than block data",
nodeMaxBytes: 50000,
conesensusMaxGas: 100,
conesensusMaxBytes: 10000000,
expectedMaxBytesCalled: func(t *testing.T) int64 {
res, err := types.MaxDataBytes(10000000, 0, 1)
require.NoError(t, err)
return res
}(t),
expectedMaxGasCalled: 100,
},
} {
t.Run(tc.name, func(t *testing.T) {
const height = 2
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewNopLogger()
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
app := abcimocks.NewBaseMock()
app.On("PrepareProposal", mock.Anything).Return(abci.ResponsePrepareProposal{
ModifiedTxStatus: abci.ResponsePrepareProposal_MODIFIED,
}, nil)
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
state, stateDB, privVals := makeState(t, 1, height)
state.ConsensusParams.Block.MaxBytes = tc.conesensusMaxBytes
state.ConsensusParams.Block.MaxGas = tc.conesensusMaxGas
stateStore := sm.NewStore(stateDB)
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
var txs types.Txs
var mb int64
if tc.nodeMaxBytes <= 0 {
mb = 1000
}
for i := 0; i < int(mb); i++ {
txs = append(txs, []byte{byte(i)})
}
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(txs)
blockExec := sm.NewBlockExecutor(
stateStore,
logger,
proxyApp,
mp,
sm.EmptyEvidencePool{},
nil,
eventBus,
)
pa, _ := state.Validators.GetByIndex(0)
commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals)
_, err = blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, tc.nodeMaxBytes)
require.NoError(t, err)
mp.AssertCalled(t, "ReapMaxBytesMaxGas", tc.expectedMaxBytesCalled, tc.expectedMaxGasCalled)
})
}
}
func TestEmptyPrepareProposal(t *testing.T) {
const height = 2
ctx, cancel := context.WithCancel(context.Background())
@@ -649,7 +754,7 @@ func TestEmptyPrepareProposal(t *testing.T) {
)
pa, _ := state.Validators.GetByIndex(0)
commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals)
_, err = blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil)
_, err = blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0)
require.NoError(t, err)
}
@@ -705,7 +810,7 @@ func TestPrepareProposalPanicOnInvalid(t *testing.T) {
commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals)
require.Panics(t,
func() {
blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil) //nolint:errcheck
blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0) //nolint:errcheck
})
mp.AssertExpectations(t)
@@ -760,7 +865,7 @@ func TestPrepareProposalRemoveTxs(t *testing.T) {
)
pa, _ := state.Validators.GetByIndex(0)
commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals)
block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil)
block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0)
require.NoError(t, err)
require.Len(t, block.Data.Txs.ToSliceOfBytes(), len(trs)-2)
@@ -819,7 +924,7 @@ func TestPrepareProposalAddedTxsIncluded(t *testing.T) {
)
pa, _ := state.Validators.GetByIndex(0)
commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals)
block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil)
block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0)
require.NoError(t, err)
require.Equal(t, txs[0], block.Data.Txs[0])
@@ -875,7 +980,7 @@ func TestPrepareProposalReorderTxs(t *testing.T) {
)
pa, _ := state.Validators.GetByIndex(0)
commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals)
block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil)
block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0)
require.NoError(t, err)
for i, tx := range block.Data.Txs {
require.Equal(t, types.Tx(trs[i].Tx), tx)
@@ -938,7 +1043,7 @@ func TestPrepareProposalModifiedTxStatusFalse(t *testing.T) {
)
pa, _ := state.Validators.GetByIndex(0)
commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals)
block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil)
block, err := blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil, 0)
require.NoError(t, err)
for i, tx := range block.Data.Txs {
require.Equal(t, txs[i], tx)

View File

@@ -54,14 +54,14 @@ func TxPreCheckFromStore(store Store) mempool.PreCheckFunc {
}
func TxPreCheckForState(state State) mempool.PreCheckFunc {
return func(tx types.Tx) error {
maxDataBytes := types.MaxDataBytesNoEvidence(
state.ConsensusParams.Block.MaxBytes,
state.Validators.Size(),
)
return mempool.PreCheckMaxBytes(maxDataBytes)(tx)
maxDataBytes, err := types.MaxDataBytesNoEvidence(
state.ConsensusParams.Block.MaxBytes,
state.Validators.Size(),
)
if err != nil {
panic(err)
}
return mempool.PreCheckMaxBytes(maxDataBytes)
}
// TxPostCheckFromStore returns a function to filter transactions after processing.

View File

@@ -342,6 +342,7 @@ func TestCreateProposalBlock(t *testing.T) {
state, commit,
proposerAddr,
nil,
0,
)
require.NoError(t, err)
@@ -396,8 +397,9 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
)
// fill the mempool with one txs just below the maximum size
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1))
tx := tmrand.Bytes(txLength - 4) // to account for the varint
mb, err := types.MaxDataBytesNoEvidence(maxBytes, 1)
require.NoError(t, err)
tx := tmrand.Bytes(int(mb) - 4) // to account for the varint
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
@@ -421,6 +423,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
state, commit,
proposerAddr,
nil,
0,
)
require.NoError(t, err)
@@ -464,8 +467,9 @@ func TestMaxProposalBlockSize(t *testing.T) {
)
// fill the mempool with one txs just below the maximum size
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, types.MaxVotesCount))
tx := tmrand.Bytes(txLength - 6) // to account for the varint
mb, err := types.MaxDataBytesNoEvidence(maxBytes, types.MaxVotesCount)
require.NoError(t, err)
tx := tmrand.Bytes(int(mb) - 6) // to account for the varint
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
// now produce more txs than what a normal block can hold with 10 smaller txs
@@ -542,6 +546,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
state, commit,
proposerAddr,
nil,
0,
)
require.NoError(t, err)
partSet, err := block.MakePartSet(types.BlockPartSizeBytes)

View File

@@ -262,9 +262,8 @@ func BlockFromProto(bp *tmproto.Block) (*Block, error) {
//-----------------------------------------------------------------------------
// MaxDataBytes returns the maximum size of block's data.
//
// XXX: Panics on negative result.
func MaxDataBytes(maxBytes, evidenceBytes int64, valsCount int) int64 {
// Returns an error if the max data is negative.
func MaxDataBytes(maxBytes, evidenceBytes int64, valsCount int) (int64, error) {
maxDataBytes := maxBytes -
MaxOverheadForBlock -
MaxHeaderBytes -
@@ -272,36 +271,33 @@ func MaxDataBytes(maxBytes, evidenceBytes int64, valsCount int) int64 {
evidenceBytes
if maxDataBytes < 0 {
panic(fmt.Sprintf(
return maxDataBytes, fmt.Errorf(
"Negative MaxDataBytes. Block.MaxBytes=%d is too small to accommodate header&lastCommit&evidence=%d",
maxBytes,
-(maxDataBytes - maxBytes),
))
)
}
return maxDataBytes
return maxDataBytes, nil
}
// MaxDataBytesNoEvidence returns the maximum size of block's data when
// evidence count is unknown. MaxEvidencePerBlock will be used for the size
// of evidence.
//
// XXX: Panics on negative result.
func MaxDataBytesNoEvidence(maxBytes int64, valsCount int) int64 {
func MaxDataBytesNoEvidence(maxBytes int64, valsCount int) (int64, error) {
maxDataBytes := maxBytes -
MaxOverheadForBlock -
MaxHeaderBytes -
MaxCommitBytes(valsCount)
if maxDataBytes < 0 {
panic(fmt.Sprintf(
return maxDataBytes, fmt.Errorf(
"Negative MaxDataBytesUnknownEvidence. Block.MaxBytes=%d is too small to accommodate header&lastCommit&evidence=%d",
maxBytes,
-(maxDataBytes - maxBytes),
))
)
}
return maxDataBytes
return maxDataBytes, nil
}
// MakeBlock returns a new block with an empty header, except what can be

View File

@@ -499,7 +499,7 @@ func TestBlockMaxDataBytes(t *testing.T) {
maxBytes int64
valsCount int
evidenceBytes int64
panics bool
errors bool
result int64
}{
0: {-10, 1, 0, true, 0},
@@ -513,15 +513,12 @@ func TestBlockMaxDataBytes(t *testing.T) {
for i, tc := range testCases {
tc := tc
if tc.panics {
assert.Panics(t, func() {
MaxDataBytes(tc.maxBytes, tc.evidenceBytes, tc.valsCount)
}, "#%v", i)
b, err := MaxDataBytes(tc.maxBytes, tc.evidenceBytes, tc.valsCount)
if tc.errors {
require.Error(t, err, "#%v", i)
} else {
assert.Equal(t,
tc.result,
MaxDataBytes(tc.maxBytes, tc.evidenceBytes, tc.valsCount),
"#%v", i)
require.NoError(t, err)
assert.Equal(t, tc.result, b, "#%v", i)
}
}
}
@@ -530,7 +527,7 @@ func TestBlockMaxDataBytesNoEvidence(t *testing.T) {
testCases := []struct {
maxBytes int64
valsCount int
panics bool
errors bool
result int64
}{
0: {-10, 1, true, 0},
@@ -542,15 +539,12 @@ func TestBlockMaxDataBytesNoEvidence(t *testing.T) {
for i, tc := range testCases {
tc := tc
if tc.panics {
assert.Panics(t, func() {
MaxDataBytesNoEvidence(tc.maxBytes, tc.valsCount)
}, "#%v", i)
b, err := MaxDataBytesNoEvidence(tc.maxBytes, tc.valsCount)
if tc.errors {
assert.Error(t, err, "#%v", i)
} else {
assert.Equal(t,
tc.result,
MaxDataBytesNoEvidence(tc.maxBytes, tc.valsCount),
"#%v", i)
require.NoError(t, err)
assert.Equal(t, tc.result, b, "#%v", i)
}
}
}