Files
scylladb/db/rate_limiter.cc
Avi Kivity f3eade2f62 treewide: relicense to ScyllaDB-Source-Available-1.0
Drop the AGPL license in favor of a source-available license.
See the blog post [1] for details.

[1] https://www.scylladb.com/2024/12/18/why-were-moving-to-a-source-available-license/
2024-12-18 17:45:13 +02:00

306 lines
12 KiB
C++

/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <cmath>
#include <numbers>
#include <array>
#include <random>
#include <variant>
#include <chrono>
#include <seastar/core/metrics.hh>
#include "utils/small_vector.hh"
#include "utils/murmur_hash.hh"
#include "db/rate_limiter.hh"
// The rate limiter keeps a hashmap of counters differentiated by operation type
// (e.g. read or write) and the partition token. On each operation,
// the corresponding counter is increased by 1.
//
// The counters are decremented via two mechanisms:
//
// 1. Every `time_window_duration`, all counters are halved.
// 2. Within a time window, on every `bucket_size` operations all counters
// are decremented by 1.
//
// The mechanism 1) makes sure that we do not forget about very frequent
// operations too quick and makes it possible to reject in a probabilistic
// manner (this is described in more detail in design notes).
//
// The mechanism 2) protects the internal hashmap from being flooded with
// counters with low values. This causes the rate limiter to underestimate
// the counter values by the current number of "buckets" within this
// time window. This strategy is also known as "lossy counting".
//
// Both mechanisms 1) and 2) are implemented in a lazy manner.
namespace db {
static constexpr size_t hash_bits = 16;
static constexpr size_t entry_count = 1 << hash_bits;
static constexpr size_t bucket_size = 10000;
void rate_limiter_base::on_timer() noexcept {
_time_window_history.pop_back();
_time_window_history.insert(_time_window_history.begin(), time_window_entry {
.entries_active = _current_entries_in_time_window,
.lossy_counting_decrease = _current_bucket,
});
_current_bucket = 0;
_current_ops_in_bucket = 0;
_current_entries_in_time_window = 0;
_current_time_window = (_current_time_window + 1) % (1 << time_window_bits);
// Because time window ids are 12 bit numbers and we increase the current
// time window number by 1 every second, it wraps around every 4096
// seconds (more than an hour). Because of this, some very old entry
// updated last 4096 seconds may accidentally become valid again.
//
// In order to prevent this, we make sure to update the entries
// more frequently. We do this by refreshing all the entries within half
// of the wraparound period (2048 seconds).
//
// Instead of clearing everything at once, we divide this operation
// into many small steps and perform them during time window change.
//
// All of this should make sure that each entry's time window is not
// older than 2048 seconds from the current generation.
constexpr size_t period = 1 << (time_window_bits - 1);
constexpr size_t entries_per_step = entry_count / period;
const size_t begin = _current_time_window * entries_per_step;
for (size_t i = 0; i < entries_per_step; i++) {
entry_refresh(_entries[(begin + i) % entry_count]);
}
}
rate_limiter_base::entry* rate_limiter_base::get_entry(uint32_t label, uint64_t token) noexcept {
// We need to either find the existing entry for this (label, token) combination
// or otherwise find an invalid entry which we can initialize and use.
//
// We start by looking at the entry corresponding to the computed hash,
// if it's occupied by another (label, token) try other entries using
// the quadratic probing strategy.
//
// We limit ourselves to 32 attempts - if no suitable entry is found
// then we return nullptr and admit the operation unconditionally.
// Because we use quadratic probing and entries can be deleted (lazily),
// a situation can occur where an entry A suddenly becomes inaccessible
// because another entry B which is earlier on the probe chain is deleted.
// One of the following will happen:
//
// 1. Either we will allocate a new entry over B and A becomes accessible
// again,
// 2. Or we will allocate a new entry for the same operation/partition as A
// and A will eventually expire.
//
// In the worst case, A might be a "hot" entry and be actively rate limited
// and the described situation will cause a large number of operations
// to be admitted. Fortunately, this will move the entry earlier in the
// probe chain, so this situation will happen a limited number of times (if
// any at all) for a single "hot" entry.
size_t hash = compute_hash(label, token);
static constexpr size_t max_probes = 32;
for (size_t i = 0; i < max_probes; i++) {
// Quadratic probing - every iteration jumps further than the previous one
hash = (hash + i) % entry_count;
entry& b = _entries[hash];
++_metrics.probe_count;
entry_refresh(b);
if (entry_is_empty(b)) {
++_metrics.allocations_on_empty;
b.token = token;
b.label = label;
b.op_count = _current_bucket;
return &b;
} else if (b.token == token && b.label == label) {
++_metrics.successful_lookups;
return &b;
}
}
++_metrics.failed_allocations;
return nullptr;
}
size_t rate_limiter_base::compute_hash(uint32_t label, uint64_t token) noexcept {
// The map key is a tuple (token, key) + salt
// The key is hashed with murmur hash for good hash quality
static constexpr size_t key_length = sizeof(token) + sizeof(label) + sizeof(_salt);
std::array<uint8_t, key_length> key;
uint8_t* ptr = key.data();
memcpy(ptr, &token, sizeof(token));
ptr += sizeof(token);
memcpy(ptr, &label, sizeof(label));
ptr += sizeof(label);
memcpy(ptr, &_salt, sizeof(_salt));
std::array<uint64_t, 2> out;
utils::murmur_hash::hash3_x64_128(key.data(), key_length, 0, out);
return out[0];
}
void rate_limiter_base::entry_refresh(rate_limiter_base::entry& b) noexcept {
uint32_t window_delta = _current_time_window - b.time_window;
if (window_delta == 0) {
// The entry is fresh, it was allocated in this time window
return;
}
if (window_delta < _time_window_history.size()) {
// The entry is not that old so we have to apply the effects
// of lossy counting and halving on time window switch
--_time_window_history[window_delta - 1].entries_active;
while (window_delta > 0) {
if (b.op_count > _time_window_history[window_delta - 1].lossy_counting_decrease) {
b.op_count -= _time_window_history[window_delta - 1].lossy_counting_decrease;
} else {
b.op_count = 0;
}
b.op_count /= 2;
--window_delta;
}
} else {
// The entry is very old and the op_count can be safely decreased to zero
b.op_count = 0;
}
++_current_entries_in_time_window;
b.time_window = _current_time_window;
}
bool rate_limiter_base::entry_is_empty(const rate_limiter_base::entry& b) noexcept {
return b.op_count <= _current_bucket;
}
void rate_limiter_base::register_metrics() {
namespace sm = seastar::metrics;
_metric_group.add_group("per_partition_rate_limiter", {
// TODO: Most of the following metrics are pretty low-level and not useful for users,
// perhaps they should be hidden behind a configuration flag
sm::make_counter("allocations", _metrics.allocations_on_empty,
sm::description("Number of times a entry was allocated over an empty/expired entry.")),
sm::make_counter("successful_lookups", _metrics.successful_lookups,
sm::description("Number of times a lookup returned an already allocated entry.")),
sm::make_counter("failed_allocations", _metrics.failed_allocations,
sm::description("Number of times the rate limiter gave up trying to allocate.")),
sm::make_counter("probe_count", _metrics.probe_count,
sm::description("Number of probes made during lookups.")),
sm::make_gauge("load_factor", [&] {
uint32_t occupied_entry_count = _current_entries_in_time_window;
for (const auto& twe : _time_window_history) {
occupied_entry_count += twe.entries_active;
}
return double(occupied_entry_count) / double(entry_count);
},
sm::description("Current load factor of the hash table (upper bound, may be overestimated).")),
});
}
rate_limiter_base::rate_limiter_base()
: _salt(std::random_device{}())
, _entries(entry_count)
, _time_window_history(op_count_bits - 1) {
register_metrics();
}
uint64_t rate_limiter_base::increase_and_get_counter(label& l, uint64_t token) noexcept {
// Assign a label if not done yet
if (l._label == 0) {
l._label = _next_label++;
}
entry* b = get_entry(l._label, token);
if (!b) {
// We failed to allocate a entry for this partition. This means that
// we won't track hit count for this partition during this time window.
// Assume that it's OK to admit the operation.
return 0;
}
// Protect from wrap-around
b->op_count = std::min<uint32_t>((1 << op_count_bits) - 1, b->op_count + 1);
++_current_ops_in_bucket;
if (_current_ops_in_bucket >= bucket_size) {
// Every `bucket_size` operations, virtually decrement all entries
// by one. We implement it by always subtracting the `_current_bucket`
// when comparing the count in the entry with the limit.
++_current_bucket;
_current_ops_in_bucket -= bucket_size;
}
return b->op_count - _current_bucket;
}
rate_limiter_base::can_proceed rate_limiter_base::account_operation(
label& l, uint64_t token, uint64_t limit,
const db::per_partition_rate_limit::info& rate_limit_info) noexcept {
if (std::holds_alternative<std::monostate>(rate_limit_info)) {
// Rate limiting turned off
return can_proceed::yes;
}
const uint64_t count = increase_and_get_counter(l, token);
if (auto* info = std::get_if<db::per_partition_rate_limit::account_and_enforce>(&rate_limit_info)) {
// On each time window change we halve the entry counts, therefore
// a partition with X ops/s will stabilize at 2X hits at the end
// of each time window.
if (count <= 2 * limit) {
return can_proceed::yes;
} else {
// As mentioned before, assuming a fixed operation rate, the operation
// count in a entry will oscillate between X at the beginning of the
// time window and 2X at the end. In order to only accept `limit`
// operations within a time window, we need to reject with probability
// P_c(x), where P_c(x) is a function such that, integrated over [X, 2X]
// will be equal to `limit`. `P_c(x) = limit / (x * ln 2)` satisfies
// this criterion.
//
// All replicas get the same value for the random variable X, with an
// expectation that all replicas' counters oscillate between the same
// values. Because of that, most of the time replicas will agree
// and either all accept or reject.
if (info->get_random_variable_as_double() * double(count) * std::numbers::ln2 < double(limit)) {
return can_proceed::yes;
} else {
return can_proceed::no;
}
}
} else {
return can_proceed::yes;
}
}
template class generic_rate_limiter<seastar::lowres_clock>;
}