diff --git a/db/hints/internal/hint_sender.cc b/db/hints/internal/hint_sender.cc index 20b520aab4..2dc0a3edfa 100644 --- a/db/hints/internal/hint_sender.cc +++ b/db/hints/internal/hint_sender.cc @@ -154,7 +154,10 @@ hint_sender::~hint_sender() { future<> hint_sender::stop(drain should_drain) noexcept { - return seastar::async([this, should_drain] { + seastar::thread_attributes attr; + + attr.sched_group = _hints_cpu_sched_group; + return seastar::async(std::move(attr), [this, should_drain] { set_stopping(); _stop_as.request_abort(); _stopped.get(); diff --git a/replica/database.cc b/replica/database.cc index 7dff5c808e..fa15a5291b 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2415,9 +2415,7 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t if (!s->is_synced()) { on_internal_error(dblog, format("attempted to apply hint using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version())); } - return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, tr_state = std::move(tr_state), timeout] () mutable { - return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}); - }); + return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}); } keyspace::config diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 4275bbcd3b..3cb90e5ffb 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -94,6 +94,7 @@ #include "locator/util.hh" #include "tools/build_info.hh" #include "utils/labels.hh" +#include "debug.hh" namespace bi = boost::intrusive; @@ -1522,6 +1523,9 @@ public: virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, const locator::effective_replication_map& erm) override { + if (current_scheduling_group() != debug::streaming_scheduling_group) { + on_internal_error(dblog, format("attempted to apply hint in {} scheduling group", current_scheduling_group().name())); + } // A hint will be sent to all relevant endpoints when the endpoint it was originally intended for // becomes unavailable - this might include the current node return sp.mutate_hint(_schema, *_mutation, std::move(tr_state), timeout);