database: apply per-partition rate limiting for reads/writes
Adds the `db::rate_limiter` to the `database` class and modifies the `query` and `apply` methods so that they account the read/write operations in the rate limiter and optionally reject them.
This commit is contained in:
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
#include "log.hh"
|
||||
#include "replica/database_fwd.hh"
|
||||
#include "utils/lister.hh"
|
||||
#include "replica/database.hh"
|
||||
#include <seastar/core/future-util.hh>
|
||||
@@ -63,6 +64,7 @@
|
||||
#include "tombstone_gc.hh"
|
||||
|
||||
#include "replica/data_dictionary_impl.hh"
|
||||
#include "replica/exceptions.hh"
|
||||
#include "readers/multi_range.hh"
|
||||
#include "readers/multishard.hh"
|
||||
|
||||
@@ -1318,15 +1320,51 @@ request_class classify_request(const database_config& _dbcfg) {
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
|
||||
static db::rate_limiter::can_proceed account_singular_ranges_to_rate_limit(
|
||||
db::rate_limiter& limiter, column_family& cf,
|
||||
const dht::partition_range_vector& ranges,
|
||||
const database_config& dbcfg,
|
||||
db::per_partition_rate_limit::info rate_limit_info) {
|
||||
using can_proceed = db::rate_limiter::can_proceed;
|
||||
|
||||
if (std::holds_alternative<std::monostate>(rate_limit_info) || !can_apply_per_partition_rate_limit(*cf.schema(), dbcfg, db::operation_type::read)) {
|
||||
// Rate limiting is disabled for this query
|
||||
return can_proceed::yes;
|
||||
}
|
||||
|
||||
auto table_limit = *cf.schema()->per_partition_rate_limit_options().get_max_reads_per_second();
|
||||
can_proceed ret = can_proceed::yes;
|
||||
|
||||
auto& read_label = cf.get_rate_limiter_label_for_reads();
|
||||
for (const auto& range : ranges) {
|
||||
if (!range.is_singular()) {
|
||||
continue;
|
||||
}
|
||||
auto token = dht::token::to_int64(ranges.front().start()->value().token());
|
||||
if (limiter.account_operation(read_label, token, table_limit, rate_limit_info) == db::rate_limiter::can_proceed::no) {
|
||||
// Don't return immediately - account all ranges first
|
||||
ret = can_proceed::no;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>
|
||||
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
|
||||
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
|
||||
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
|
||||
const auto reversed = cmd.slice.is_reversed();
|
||||
if (reversed) {
|
||||
s = s->make_reversed();
|
||||
}
|
||||
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
|
||||
if (account_singular_ranges_to_rate_limit(_rate_limiter, cf, ranges, _dbcfg, rate_limit_info) == db::rate_limiter::can_proceed::no) {
|
||||
co_await coroutine::return_exception(replica::rate_limit_exception());
|
||||
}
|
||||
|
||||
auto& semaphore = get_reader_concurrency_semaphore();
|
||||
auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size();
|
||||
|
||||
@@ -1770,13 +1808,22 @@ future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db
|
||||
}
|
||||
}
|
||||
|
||||
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) {
|
||||
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, db::per_partition_rate_limit::info rate_limit_info) {
|
||||
// I'm doing a nullcheck here since the init code path for db etc
|
||||
// is a little in flux and commitlog is created only when db is
|
||||
// initied from datadir.
|
||||
auto uuid = m.column_family_id();
|
||||
auto& cf = find_column_family(uuid);
|
||||
|
||||
if (!std::holds_alternative<std::monostate>(rate_limit_info) && can_apply_per_partition_rate_limit(*s, db::operation_type::write)) {
|
||||
auto table_limit = *s->per_partition_rate_limit_options().get_max_writes_per_second();
|
||||
auto& write_label = cf.get_rate_limiter_label_for_writes();
|
||||
auto token = dht::token::to_int64(dht::get_token(*s, m.key()));
|
||||
if (_rate_limiter.account_operation(write_label, token, table_limit, rate_limit_info) == db::rate_limiter::can_proceed::no) {
|
||||
co_await coroutine::return_exception(replica::rate_limit_exception());
|
||||
}
|
||||
}
|
||||
|
||||
sync = sync || db::commitlog::force_sync(s->wait_for_sync_to_commitlog());
|
||||
|
||||
// Signal to view building code that a write is in progress,
|
||||
@@ -1833,7 +1880,7 @@ void database::update_write_metrics_for_timed_out_write() {
|
||||
++_stats->total_writes_timedout;
|
||||
}
|
||||
|
||||
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout) {
|
||||
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
|
||||
if (dblog.is_enabled(logging::log_level::trace)) {
|
||||
dblog.trace("apply {}", m.pretty_printer(s));
|
||||
}
|
||||
@@ -1844,7 +1891,7 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_
|
||||
if (!s->is_synced()) {
|
||||
on_internal_error(dblog, format("attempted to apply mutation using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync));
|
||||
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync, rate_limit_info));
|
||||
}
|
||||
|
||||
future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout) {
|
||||
@@ -1855,7 +1902,7 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t
|
||||
on_internal_error(dblog, format("attempted to apply hint using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, tr_state = std::move(tr_state), timeout] () mutable {
|
||||
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no));
|
||||
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -66,6 +66,8 @@
|
||||
#include "absl-flat_hash_map.hh"
|
||||
#include "utils/cross-shard-barrier.hh"
|
||||
#include "sstables/generation_type.hh"
|
||||
#include "db/rate_limiter.hh"
|
||||
#include "db/per_partition_rate_limit_info.hh"
|
||||
|
||||
class cell_locker;
|
||||
class cell_locker_stats;
|
||||
@@ -452,6 +454,11 @@ private:
|
||||
std::vector<view_ptr> _views;
|
||||
|
||||
std::unique_ptr<cell_locker> _counter_cell_locks; // Memory-intensive; allocate only when needed.
|
||||
|
||||
// Labels used to identify writes and reads for this table in the rate_limiter structure.
|
||||
db::rate_limiter::label _rate_limiter_label_for_writes;
|
||||
db::rate_limiter::label _rate_limiter_label_for_reads;
|
||||
|
||||
void set_metrics();
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
|
||||
@@ -744,6 +751,23 @@ public:
|
||||
return _cache;
|
||||
}
|
||||
|
||||
db::rate_limiter::label& get_rate_limiter_label_for_op_type(db::operation_type op_type) {
|
||||
switch (op_type) {
|
||||
case db::operation_type::write:
|
||||
return _rate_limiter_label_for_writes;
|
||||
case db::operation_type::read:
|
||||
return _rate_limiter_label_for_reads;
|
||||
}
|
||||
}
|
||||
|
||||
db::rate_limiter::label& get_rate_limiter_label_for_writes() {
|
||||
return _rate_limiter_label_for_writes;
|
||||
}
|
||||
|
||||
db::rate_limiter::label& get_rate_limiter_label_for_reads() {
|
||||
return _rate_limiter_label_for_reads;
|
||||
}
|
||||
|
||||
future<std::vector<locked_cell>> lock_counter_cells(const mutation& m, db::timeout_clock::time_point timeout);
|
||||
|
||||
logalloc::occupancy_stats occupancy() const;
|
||||
@@ -1294,7 +1318,8 @@ private:
|
||||
const frozen_mutation&,
|
||||
tracing::trace_state_ptr,
|
||||
db::timeout_clock::time_point,
|
||||
db::commitlog_force_sync> _apply_stage;
|
||||
db::commitlog_force_sync,
|
||||
db::per_partition_rate_limit::info> _apply_stage;
|
||||
|
||||
flat_hash_map<sstring, keyspace> _keyspaces;
|
||||
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;
|
||||
@@ -1332,6 +1357,8 @@ private:
|
||||
std::unique_ptr<wasm::engine> _wasm_engine;
|
||||
utils::cross_shard_barrier _stop_barrier;
|
||||
|
||||
db::rate_limiter _rate_limiter;
|
||||
|
||||
public:
|
||||
data_dictionary::database as_data_dictionary() const;
|
||||
std::shared_ptr<data_dictionary::user_types_storage> as_user_types_storage() const noexcept;
|
||||
@@ -1361,7 +1388,7 @@ private:
|
||||
void setup_metrics();
|
||||
void setup_scylla_memory_diagnostics_producer();
|
||||
|
||||
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync);
|
||||
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info);
|
||||
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
|
||||
|
||||
future<mutation> do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, db::timeout_clock::time_point timeout,
|
||||
@@ -1487,12 +1514,12 @@ public:
|
||||
|
||||
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>> query(schema_ptr, const query::read_command& cmd, query::result_options opts,
|
||||
const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout);
|
||||
db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{});
|
||||
future<std::tuple<reconcilable_result, cache_temperature>> query_mutations(schema_ptr, const query::read_command& cmd, const dht::partition_range& range,
|
||||
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout);
|
||||
// Apply the mutation atomically.
|
||||
// Throws timed_out_error when timeout is reached.
|
||||
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout);
|
||||
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{});
|
||||
future<> apply_hint(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout);
|
||||
future<mutation> apply_counter_update(schema_ptr, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
||||
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);
|
||||
|
||||
Reference in New Issue
Block a user