reader_concurrency_semaphore: remove default parameter values from constructors
It's easy to forget about supplying the correct value for a parameter when it has a default value specified. It's safer if 'production code' is forced to always supply these parameters manually. The default values were mostly useful in tests, where some parameters didn't matter that much and where the majority of uses of the class are. Without default values adding a new parameter is a pain, forcing one to modify every usage in the tests - and there are a bunch of them. To solve this, we introduce a new constructor which requires passing the `for_tests` tag, marking that the constructor is only supposed to be used in tests (and the constructor has an appropriate comment). This constructor uses default values, but the other constructors - used in 'production code' - do not.
This commit is contained in:
@@ -340,14 +340,16 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _streaming_concurrency_sem(
|
||||
max_count_streaming_concurrent_reads,
|
||||
max_memory_streaming_concurrent_reads(),
|
||||
"_streaming_concurrency_sem")
|
||||
"_streaming_concurrency_sem",
|
||||
std::numeric_limits<size_t>::max())
|
||||
// No limits, just for accounting.
|
||||
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction")
|
||||
, _system_read_concurrency_sem(
|
||||
// Using higher initial concurrency, see revert_initial_system_read_concurrency_boost().
|
||||
max_count_concurrent_reads,
|
||||
max_memory_system_concurrent_reads(),
|
||||
"_system_read_concurrency_sem")
|
||||
"_system_read_concurrency_sem",
|
||||
std::numeric_limits<size_t>::max())
|
||||
, _row_cache_tracker(cache_tracker::register_metrics::yes)
|
||||
, _apply_stage("db_apply", &database::do_apply)
|
||||
, _version(empty_version)
|
||||
|
||||
@@ -625,7 +625,8 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
|
||||
: reader_concurrency_semaphore(
|
||||
std::numeric_limits<int>::max(),
|
||||
std::numeric_limits<ssize_t>::max(),
|
||||
std::move(name)) {}
|
||||
std::move(name),
|
||||
std::numeric_limits<size_t>::max()) {}
|
||||
|
||||
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
if (!_stats.total_permits) {
|
||||
|
||||
@@ -239,7 +239,7 @@ public:
|
||||
reader_concurrency_semaphore(int count,
|
||||
ssize_t memory,
|
||||
sstring name,
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max());
|
||||
size_t max_queue_length);
|
||||
|
||||
/// Create a semaphore with practically unlimited count and memory.
|
||||
///
|
||||
@@ -247,6 +247,17 @@ public:
|
||||
/// The semaphore's name has to be unique!
|
||||
explicit reader_concurrency_semaphore(no_limits, sstring name);
|
||||
|
||||
/// A helper constructor *only for tests* that supplies default arguments.
|
||||
/// The other constructors have default values removed so 'production-code'
|
||||
/// is forced to specify all of them manually to avoid bugs.
|
||||
struct for_tests{};
|
||||
reader_concurrency_semaphore(for_tests, sstring name,
|
||||
int count = std::numeric_limits<int>::max(),
|
||||
ssize_t memory = std::numeric_limits<ssize_t>::max(),
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max())
|
||||
: reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length)
|
||||
{}
|
||||
|
||||
~reader_concurrency_semaphore();
|
||||
|
||||
reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete;
|
||||
|
||||
@@ -425,7 +425,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) {
|
||||
|
||||
simple_schema s;
|
||||
|
||||
reader_concurrency_semaphore sem(1, 100, get_name());
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
|
||||
|
||||
@@ -3419,7 +3419,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to)
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
|
||||
reader_concurrency_semaphore semaphore(1, 0, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
|
||||
@@ -163,7 +163,7 @@ public:
|
||||
};
|
||||
|
||||
test_querier_cache(const noncopyable_function<sstring(size_t)>& external_make_value, std::chrono::seconds entry_ttl = 24h, ssize_t max_memory = std::numeric_limits<ssize_t>::max())
|
||||
: _sem(std::numeric_limits<int>::max(), max_memory, "test_querier_cache")
|
||||
: _sem(reader_concurrency_semaphore::for_tests{}, "test_querier_cache", std::numeric_limits<int>::max(), max_memory)
|
||||
, _cache(entry_ttl)
|
||||
, _mutations(make_mutations(_s, external_make_value))
|
||||
, _mutation_source([this] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
|
||||
|
||||
@@ -63,7 +63,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_releases_units) {
|
||||
simple_schema s;
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
// Not admitted, active
|
||||
@@ -123,7 +123,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
|
||||
simple_schema s;
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
|
||||
const auto base_resources = reader_concurrency_semaphore::resources{1, 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
@@ -279,7 +279,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) {
|
||||
#endif
|
||||
|
||||
simple_schema s;
|
||||
reader_concurrency_semaphore semaphore(count, count * 1024, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), count, count * 1024);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
std::vector<std::unique_ptr<reader>> readers;
|
||||
@@ -396,7 +396,7 @@ class dummy_file_impl : public file_impl {
|
||||
|
||||
SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
|
||||
return async([&] {
|
||||
reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 4 * 1024);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout).get();
|
||||
|
||||
@@ -452,7 +452,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
|
||||
|
||||
SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
|
||||
return async([&] () {
|
||||
reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 2, new_reader_base_cost);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
{
|
||||
@@ -496,7 +496,7 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
|
||||
|
||||
SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) {
|
||||
return async([&] () {
|
||||
reader_concurrency_semaphore semaphore(1, new_reader_base_cost, get_name(), 2);
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost, 2);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
{
|
||||
@@ -599,7 +599,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
simple_schema s;
|
||||
const auto schema_ptr = s.schema().get();
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto require_can_admit = [&] (bool expected_can_admit, const char* description,
|
||||
@@ -841,7 +841,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 0);
|
||||
|
||||
@@ -796,7 +796,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
}
|
||||
};
|
||||
|
||||
reader_concurrency_semaphore sem(1, new_reader_base_cost, get_name());
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
|
||||
auto schema = schema_builder("ks", "cf")
|
||||
|
||||
@@ -96,7 +96,7 @@ public:
|
||||
auto name = format("tests::reader_lifecycle_policy@{}@shard_id={}", fmt::ptr(this), shard);
|
||||
if (_evict_paused_readers) {
|
||||
// Create with no memory, so all inactive reads are immediately evicted.
|
||||
_contexts[shard]->semaphore.emplace(1, 0, std::move(name));
|
||||
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::for_tests{}, std::move(name), 1, 0);
|
||||
} else {
|
||||
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}, std::move(name));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user