replica/database: introduce foreach_reader_concurrency_semaphore

Currently we have a single method -- detach_column_family() -- which
does something with each semaphore. Soon there will be another one.
Introduce a method to do something with all semaphores, to make this
smoother. Enterprise has a different set of semaphores, and this will
reduce friction.
This commit is contained in:
Botond Dénes
2024-04-30 01:36:56 -04:00
parent 3c813fbb99
commit 338af5055c
2 changed files with 11 additions and 3 deletions

View File

@@ -976,9 +976,9 @@ future<> database::detach_column_family(table& cf) {
co_await remove(cf);
cf.clear_views();
co_await cf.await_pending_ops();
for (auto* sem : {&_read_concurrency_sem, &_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem}) {
co_await sem->evict_inactive_reads_for_table(uuid);
}
co_await foreach_reader_concurrency_semaphore([uuid] (reader_concurrency_semaphore& sem) -> future<> {
co_await sem.evict_inactive_reads_for_table(uuid);
});
}
global_table_ptr::global_table_ptr() {
@@ -1696,6 +1696,12 @@ bool database::is_user_semaphore(const reader_concurrency_semaphore& semaphore)
&& &semaphore != &_system_read_concurrency_sem;
}
future<> database::foreach_reader_concurrency_semaphore(std::function<future<>(reader_concurrency_semaphore&)> func) {
for (auto* sem : {&_read_concurrency_sem, &_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem}) {
co_await func(*sem);
}
}
std::ostream& operator<<(std::ostream& out, const column_family& cf) {
fmt::print(out, "{{column_family: {}/{}}}", cf._schema->ks_name(), cf._schema->cf_name());
return out;

View File

@@ -1551,6 +1551,8 @@ private:
void drop_keyspace(const sstring& name);
future<> update_keyspace(const keyspace_metadata& tmp_ksm);
static future<> modify_keyspace_on_all_shards(sharded<database>& sharded_db, std::function<future<>(replica::database&)> func, std::function<future<>(replica::database&)> notifier);
future<> foreach_reader_concurrency_semaphore(std::function<future<>(reader_concurrency_semaphore&)> func);
public:
static table_schema_version empty_version;