database: apply per-partition rate limiting for reads/writes

Adds the `db::rate_limiter` to the `database` class and modifies the
`query` and `apply` methods so that they account the read/write
operations in the rate limiter and optionally reject them.
This commit is contained in:
Piotr Dulikowski
2022-05-16 10:57:30 +02:00
parent ec635ba170
commit cc9a2ad41f
2 changed files with 83 additions and 9 deletions

View File

@@ -7,6 +7,7 @@
*/
#include "log.hh"
#include "replica/database_fwd.hh"
#include "utils/lister.hh"
#include "replica/database.hh"
#include <seastar/core/future-util.hh>
@@ -63,6 +64,7 @@
#include "tombstone_gc.hh"
#include "replica/data_dictionary_impl.hh"
#include "replica/exceptions.hh"
#include "readers/multi_range.hh"
#include "readers/multishard.hh"
@@ -1318,15 +1320,51 @@ request_class classify_request(const database_config& _dbcfg) {
} // anonymous namespace
static db::rate_limiter::can_proceed account_singular_ranges_to_rate_limit(
db::rate_limiter& limiter, column_family& cf,
const dht::partition_range_vector& ranges,
const database_config& dbcfg,
db::per_partition_rate_limit::info rate_limit_info) {
using can_proceed = db::rate_limiter::can_proceed;
if (std::holds_alternative<std::monostate>(rate_limit_info) || !can_apply_per_partition_rate_limit(*cf.schema(), dbcfg, db::operation_type::read)) {
// Rate limiting is disabled for this query
return can_proceed::yes;
}
auto table_limit = *cf.schema()->per_partition_rate_limit_options().get_max_reads_per_second();
can_proceed ret = can_proceed::yes;
auto& read_label = cf.get_rate_limiter_label_for_reads();
for (const auto& range : ranges) {
if (!range.is_singular()) {
continue;
}
auto token = dht::token::to_int64(ranges.front().start()->value().token());
if (limiter.account_operation(read_label, token, table_limit, rate_limit_info) == db::rate_limiter::can_proceed::no) {
// Don't return immediately - account all ranges first
ret = can_proceed::no;
}
}
return ret;
}
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
const auto reversed = cmd.slice.is_reversed();
if (reversed) {
s = s->make_reversed();
}
column_family& cf = find_column_family(cmd.cf_id);
if (account_singular_ranges_to_rate_limit(_rate_limiter, cf, ranges, _dbcfg, rate_limit_info) == db::rate_limiter::can_proceed::no) {
co_await coroutine::return_exception(replica::rate_limit_exception());
}
auto& semaphore = get_reader_concurrency_semaphore();
auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size();
@@ -1770,13 +1808,22 @@ future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db
}
}
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) {
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, db::per_partition_rate_limit::info rate_limit_info) {
// I'm doing a nullcheck here since the init code path for db etc
// is a little in flux and commitlog is created only when db is
// initied from datadir.
auto uuid = m.column_family_id();
auto& cf = find_column_family(uuid);
if (!std::holds_alternative<std::monostate>(rate_limit_info) && can_apply_per_partition_rate_limit(*s, db::operation_type::write)) {
auto table_limit = *s->per_partition_rate_limit_options().get_max_writes_per_second();
auto& write_label = cf.get_rate_limiter_label_for_writes();
auto token = dht::token::to_int64(dht::get_token(*s, m.key()));
if (_rate_limiter.account_operation(write_label, token, table_limit, rate_limit_info) == db::rate_limiter::can_proceed::no) {
co_await coroutine::return_exception(replica::rate_limit_exception());
}
}
sync = sync || db::commitlog::force_sync(s->wait_for_sync_to_commitlog());
// Signal to view building code that a write is in progress,
@@ -1833,7 +1880,7 @@ void database::update_write_metrics_for_timed_out_write() {
++_stats->total_writes_timedout;
}
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout) {
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
if (dblog.is_enabled(logging::log_level::trace)) {
dblog.trace("apply {}", m.pretty_printer(s));
}
@@ -1844,7 +1891,7 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_
if (!s->is_synced()) {
on_internal_error(dblog, format("attempted to apply mutation using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
}
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync));
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync, rate_limit_info));
}
future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout) {
@@ -1855,7 +1902,7 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t
on_internal_error(dblog, format("attempted to apply hint using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
}
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, tr_state = std::move(tr_state), timeout] () mutable {
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no));
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}));
});
}

View File

@@ -66,6 +66,8 @@
#include "absl-flat_hash_map.hh"
#include "utils/cross-shard-barrier.hh"
#include "sstables/generation_type.hh"
#include "db/rate_limiter.hh"
#include "db/per_partition_rate_limit_info.hh"
class cell_locker;
class cell_locker_stats;
@@ -452,6 +454,11 @@ private:
std::vector<view_ptr> _views;
std::unique_ptr<cell_locker> _counter_cell_locks; // Memory-intensive; allocate only when needed.
// Labels used to identify writes and reads for this table in the rate_limiter structure.
db::rate_limiter::label _rate_limiter_label_for_writes;
db::rate_limiter::label _rate_limiter_label_for_reads;
void set_metrics();
seastar::metrics::metric_groups _metrics;
@@ -744,6 +751,23 @@ public:
return _cache;
}
db::rate_limiter::label& get_rate_limiter_label_for_op_type(db::operation_type op_type) {
switch (op_type) {
case db::operation_type::write:
return _rate_limiter_label_for_writes;
case db::operation_type::read:
return _rate_limiter_label_for_reads;
}
}
db::rate_limiter::label& get_rate_limiter_label_for_writes() {
return _rate_limiter_label_for_writes;
}
db::rate_limiter::label& get_rate_limiter_label_for_reads() {
return _rate_limiter_label_for_reads;
}
future<std::vector<locked_cell>> lock_counter_cells(const mutation& m, db::timeout_clock::time_point timeout);
logalloc::occupancy_stats occupancy() const;
@@ -1294,7 +1318,8 @@ private:
const frozen_mutation&,
tracing::trace_state_ptr,
db::timeout_clock::time_point,
db::commitlog_force_sync> _apply_stage;
db::commitlog_force_sync,
db::per_partition_rate_limit::info> _apply_stage;
flat_hash_map<sstring, keyspace> _keyspaces;
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;
@@ -1332,6 +1357,8 @@ private:
std::unique_ptr<wasm::engine> _wasm_engine;
utils::cross_shard_barrier _stop_barrier;
db::rate_limiter _rate_limiter;
public:
data_dictionary::database as_data_dictionary() const;
std::shared_ptr<data_dictionary::user_types_storage> as_user_types_storage() const noexcept;
@@ -1361,7 +1388,7 @@ private:
void setup_metrics();
void setup_scylla_memory_diagnostics_producer();
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync);
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info);
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
future<mutation> do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, db::timeout_clock::time_point timeout,
@@ -1487,12 +1514,12 @@ public:
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>> query(schema_ptr, const query::read_command& cmd, query::result_options opts,
const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout);
db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{});
future<std::tuple<reconcilable_result, cache_temperature>> query_mutations(schema_ptr, const query::read_command& cmd, const dht::partition_range& range,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout);
// Apply the mutation atomically.
// Throws timed_out_error when timeout is reached.
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout);
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{});
future<> apply_hint(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout);
future<mutation> apply_counter_update(schema_ptr, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);