reader_concurrency_semaphore: remove default parameter values from constructors

It's easy to forget about supplying the correct value for a parameter
when it has a default value specified. It's safer if 'production code'
is forced to always supply these parameters manually.

The default values were mostly useful in tests, where some parameters
didn't matter that much and where the majority of uses of the class are.
Without default values adding a new parameter is a pain, forcing one to
modify every usage in the tests - and there are a bunch of them. To
solve this, we introduce a new constructor which requires passing the
`for_tests` tag, marking that the constructor is only supposed to be
used in tests (and the constructor has an appropriate comment). This
constructor uses default values, but the other constructors - used in
'production code' - do not.
This commit is contained in:
Kamil Braun
2021-09-13 16:38:07 +02:00
parent 8386b55e9c
commit fbb83dd5ca
9 changed files with 31 additions and 17 deletions

View File

@@ -340,14 +340,16 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
, _streaming_concurrency_sem(
max_count_streaming_concurrent_reads,
max_memory_streaming_concurrent_reads(),
"_streaming_concurrency_sem")
"_streaming_concurrency_sem",
std::numeric_limits<size_t>::max())
// No limits, just for accounting.
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction")
, _system_read_concurrency_sem(
// Using higher initial concurrency, see revert_initial_system_read_concurrency_boost().
max_count_concurrent_reads,
max_memory_system_concurrent_reads(),
"_system_read_concurrency_sem")
"_system_read_concurrency_sem",
std::numeric_limits<size_t>::max())
, _row_cache_tracker(cache_tracker::register_metrics::yes)
, _apply_stage("db_apply", &database::do_apply)
, _version(empty_version)

View File

@@ -625,7 +625,8 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
: reader_concurrency_semaphore(
std::numeric_limits<int>::max(),
std::numeric_limits<ssize_t>::max(),
std::move(name)) {}
std::move(name),
std::numeric_limits<size_t>::max()) {}
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
if (!_stats.total_permits) {

View File

@@ -239,7 +239,7 @@ public:
reader_concurrency_semaphore(int count,
ssize_t memory,
sstring name,
size_t max_queue_length = std::numeric_limits<size_t>::max());
size_t max_queue_length);
/// Create a semaphore with practically unlimited count and memory.
///
@@ -247,6 +247,17 @@ public:
/// The semaphore's name has to be unique!
explicit reader_concurrency_semaphore(no_limits, sstring name);
/// A helper constructor *only for tests* that supplies default arguments.
/// The other constructors have default values removed so 'production-code'
/// is forced to specify all of them manually to avoid bugs.
struct for_tests{};
reader_concurrency_semaphore(for_tests, sstring name,
int count = std::numeric_limits<int>::max(),
ssize_t memory = std::numeric_limits<ssize_t>::max(),
size_t max_queue_length = std::numeric_limits<size_t>::max())
: reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length)
{}
~reader_concurrency_semaphore();
reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete;

View File

@@ -425,7 +425,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) {
simple_schema s;
reader_concurrency_semaphore sem(1, 100, get_name());
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
auto stop_sem = deferred_stop(sem);
auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);

View File

@@ -3419,7 +3419,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to)
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
reader_concurrency_semaphore semaphore(1, 0, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);

View File

@@ -163,7 +163,7 @@ public:
};
test_querier_cache(const noncopyable_function<sstring(size_t)>& external_make_value, std::chrono::seconds entry_ttl = 24h, ssize_t max_memory = std::numeric_limits<ssize_t>::max())
: _sem(std::numeric_limits<int>::max(), max_memory, "test_querier_cache")
: _sem(reader_concurrency_semaphore::for_tests{}, "test_querier_cache", std::numeric_limits<int>::max(), max_memory)
, _cache(entry_ttl)
, _mutations(make_mutations(_s, external_make_value))
, _mutation_source([this] (schema_ptr, reader_permit permit, const dht::partition_range& range) {

View File

@@ -63,7 +63,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_releases_units) {
simple_schema s;
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
auto stop_sem = deferred_stop(semaphore);
// Not admitted, active
@@ -123,7 +123,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
simple_schema s;
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
const auto base_resources = reader_concurrency_semaphore::resources{1, 1024};
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
auto stop_sem = deferred_stop(semaphore);
@@ -279,7 +279,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) {
#endif
simple_schema s;
reader_concurrency_semaphore semaphore(count, count * 1024, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), count, count * 1024);
auto stop_sem = deferred_stop(semaphore);
std::vector<std::unique_ptr<reader>> readers;
@@ -396,7 +396,7 @@ class dummy_file_impl : public file_impl {
SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
return async([&] {
reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 4 * 1024);
auto stop_sem = deferred_stop(semaphore);
auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout).get();
@@ -452,7 +452,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
return async([&] () {
reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 2, new_reader_base_cost);
auto stop_sem = deferred_stop(semaphore);
{
@@ -496,7 +496,7 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) {
return async([&] () {
reader_concurrency_semaphore semaphore(1, new_reader_base_cost, get_name(), 2);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost, 2);
auto stop_sem = deferred_stop(semaphore);
{
@@ -599,7 +599,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
simple_schema s;
const auto schema_ptr = s.schema().get();
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
auto stop_sem = deferred_stop(semaphore);
auto require_can_admit = [&] (bool expected_can_admit, const char* description,
@@ -841,7 +841,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
auto stop_sem = deferred_stop(semaphore);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 0);

View File

@@ -796,7 +796,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
}
};
reader_concurrency_semaphore sem(1, new_reader_base_cost, get_name());
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost);
auto stop_sem = deferred_stop(sem);
auto schema = schema_builder("ks", "cf")

View File

@@ -96,7 +96,7 @@ public:
auto name = format("tests::reader_lifecycle_policy@{}@shard_id={}", fmt::ptr(this), shard);
if (_evict_paused_readers) {
// Create with no memory, so all inactive reads are immediately evicted.
_contexts[shard]->semaphore.emplace(1, 0, std::move(name));
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::for_tests{}, std::move(name), 1, 0);
} else {
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}, std::move(name));
}