Files
scylladb/service/storage_proxy_stats.hh
Avi Kivity dab56b82fa Merge 'Per-partition rate limiting' from Piotr Dulikowski
Due to its sharded and token-based architecture, Scylla works best when the user workload is more or less uniformly balanced across all nodes and shards. However, a common case when this assumption is broken is the "hot partition" - suddenly, a single partition starts getting a lot more reads and writes in comparison to other partitions. Because the shards owning the partition have only a fraction of the total cluster capacity, this quickly causes latency problems for other partitions within the same shard and vnode.

This PR introduces per-partition rate limiting feature. Now, users can choose to apply per-partition limits to their tables of choice using a schema extension:

```
ALTER TABLE ks.tbl
WITH per_partition_rate_limit = {
	'max_writes_per_second': 100,
	'max_reads_per_second': 200
};
```

Reads and writes which are detected to go over that quota are rejected to the client using a new RATE_LIMIT_ERROR CQL error code - existing error codes didn't really fit well with the rate limit error, so a new error code is added. This code is implemented as a part of a CQL protocol extension and returned to clients only if they requested the extension - if not, the existing CONFIG_ERROR will be used instead.

Limits are tracked and enforced on the replica side. If a write fails with some replicas reporting rate limit being reached, the rate limit error is propagated to the client. Additionally, the following optimization is implemented: if the coordinator shard/node is also a replica, we account the operation into the rate limit early and return an error in case of exceeding the rate limit before sending any messages to other replicas at all.

The PR covers regular, non-batch writes and single-partition reads. LWT and counters are not covered here.

Results of `perf_simple_query --smp=1 --operations-per-shard=1000000`:

- Write mode:
  ```
  8f690fdd47 (PR base):
  129644.11 tps ( 56.2 allocs/op,  13.2 tasks/op,   49785 insns/op)
  This PR:
  125564.01 tps ( 56.2 allocs/op,  13.2 tasks/op,   49825 insns/op)
  ```
- Read mode:
  ```
  8f690fdd47 (PR base):
  150026.63 tps ( 63.1 allocs/op,  12.1 tasks/op,   42806 insns/op)
  This PR:
  151043.00 tps ( 63.1 allocs/op,  12.1 tasks/op,   43075 insns/op)
  ```

Manual upgrade test:
- Start 3 nodes, 4 shards each, Scylla version 8f690fdd47
- Create a keyspace with scylla-bench, RF=3
- Start reading and writing with scylla-bench with CL=QUORUM
- Manually upgrade nodes one by one to the version from this PR
- Upgrade succeeded, apart from a small number of operations which failed when each node was being put down all reads/writes succeeded
- Successfully altered the scylla-bench table to have a read and write limit and those limits were enforced as expected

Fixes: #4703

Closes #9810

* github.com:scylladb/scylla:
  storage_proxy: metrics for per-partition rate limiting of reads
  storage_proxy: metrics for per-partition rate limiting of writes
  database: add stats for per partition rate limiting
  tests: add per_partition_rate_limit_test
  config: add add_per_partition_rate_limit_extension function for testing
  cf_prop_defs: guard per-partition rate limit with a feature
  query-request: add allow_limit flag
  storage_proxy: add allow rate limit flag to get_read_executor
  storage_proxy: resultize return type of get_read_executor
  storage_proxy: add per partition rate limit info to read RPC
  storage_proxy: add per partition rate limit info to query_result_local(_digest)
  storage_proxy: add allow rate limit flag to mutate/mutate_result
  storage_proxy: add allow rate limit flag to mutate_internal
  storage_proxy: add allow rate limit flag to mutate_begin
  storage_proxy: choose the right per partition rate limit info in write handler
  storage_proxy: resultize return types of write handler creation path
  storage_proxy: add per partition rate limit to mutation_holders
  storage_proxy: add per partition rate limit info to write RPC
  storage_proxy: add per partition rate limit info to mutate_locally
  database: apply per-partition rate limiting for reads/writes
  database: move and rename: classify_query -> classify_request
  schema: add per_partition_rate_limit schema extension
  db: add rate_limiter
  storage_proxy: propagate rate_limit_exception through read RPC
  gms: add TYPED_ERRORS_IN_READ_RPC cluster feature
  storage_proxy: pass rate_limit_exception through write RPC
  replica: add rate_limit_exception and a simple serialization framework
  docs: design doc for per-partition rate limiting
  transport: add rate_limit_error
2022-06-24 01:32:13 +03:00

229 lines
7.6 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "gms/inet_address.hh"
#include "utils/estimated_histogram.hh"
#include "utils/histogram.hh"
#include <seastar/core/metrics.hh>
namespace locator { class topology; }
namespace service {
namespace storage_proxy_stats {
// split statistics counters
struct split_stats {
static seastar::metrics::label datacenter_label;
private:
struct stats_counter {
uint64_t val = 0;
};
// counter of operations performed on a local Node
stats_counter _local;
// counters of operations performed on external Nodes aggregated per Nodes' DCs
std::unordered_map<sstring, stats_counter> _dc_stats;
// collectd registrations container
seastar::metrics::metric_groups _metrics;
// a prefix string that will be used for a collectd counters' description
sstring _short_description_prefix;
sstring _long_description_prefix;
// a statistics category, e.g. "client" or "replica"
sstring _category;
// type of operation (data/digest/mutation_data)
sstring _op_type;
// whether to register per-endpoint metrics automatically
bool _auto_register_metrics;
public:
/**
* @param category a statistics category, e.g. "client" or "replica"
* @param short_description_prefix a short description prefix
* @param long_description_prefix a long description prefix
*/
split_stats(const sstring& category, const sstring& short_description_prefix, const sstring& long_description_prefix, const sstring& op_type, bool auto_register_metrics = true);
void register_metrics_local();
void register_metrics_for(sstring dc, gms::inet_address ep);
/**
* Get a reference to the statistics counter corresponding to the given
* destination.
*
* @param ep address of a destination
*
* @return a reference to the requested counter
*/
uint64_t& get_ep_stat(const locator::topology& topo, gms::inet_address ep) noexcept;
};
struct write_stats {
// total write attempts
split_stats writes_attempts;
split_stats writes_errors;
split_stats background_replica_writes_failed;
// write attempts due to Read Repair logic
split_stats read_repair_write_attempts;
utils::timed_rate_moving_average write_unavailables;
utils::timed_rate_moving_average write_timeouts;
utils::timed_rate_moving_average write_rate_limited_by_replicas;
utils::timed_rate_moving_average write_rate_limited_by_coordinator;
utils::timed_rate_moving_average_and_histogram write;
utils::time_estimated_histogram estimated_write;
utils::timed_rate_moving_average cas_write_unavailables;
utils::timed_rate_moving_average cas_write_timeouts;
utils::timed_rate_moving_average_and_histogram cas_write;
utils::time_estimated_histogram estimated_cas_write;
utils::estimated_histogram cas_write_contention;
uint64_t writes = 0;
// A CQL write query arrived to a non-replica node and was
// forwarded by a coordinator to a replica
uint64_t writes_coordinator_outside_replica_set = 0;
// A CQL read query arrived to a non-replica node and was
// forwarded by a coordinator to a replica
uint64_t reads_coordinator_outside_replica_set = 0;
uint64_t background_writes = 0; // client no longer waits for the write
uint64_t throttled_writes = 0; // total number of writes ever delayed due to throttling
uint64_t throttled_base_writes = 0; // current number of base writes delayed due to view update backlog
uint64_t background_writes_failed = 0;
uint64_t writes_failed_due_to_too_many_in_flight_hints = 0;
uint64_t cas_write_unfinished_commit = 0;
uint64_t cas_write_condition_not_met = 0;
uint64_t cas_write_timeout_due_to_uncertainty = 0;
uint64_t cas_failed_read_round_optimization = 0;
uint16_t cas_now_pruning = 0;
uint64_t cas_prune = 0;
uint64_t cas_coordinator_dropped_prune = 0;
uint64_t cas_replica_dropped_prune = 0;
std::chrono::microseconds last_mv_flow_control_delay; // delay added for MV flow control in the last request
public:
write_stats();
write_stats(const sstring& category, bool auto_register_stats);
void register_stats();
void register_split_metrics_local();
protected:
seastar::metrics::metric_groups _metrics;
};
struct stats : public write_stats {
seastar::metrics::metric_groups _metrics;
utils::timed_rate_moving_average read_timeouts;
utils::timed_rate_moving_average read_unavailables;
utils::timed_rate_moving_average read_rate_limited_by_replicas;
utils::timed_rate_moving_average read_rate_limited_by_coordinator;
utils::timed_rate_moving_average range_slice_timeouts;
utils::timed_rate_moving_average range_slice_unavailables;
utils::timed_rate_moving_average cas_read_timeouts;
utils::timed_rate_moving_average cas_read_unavailables;
utils::estimated_histogram cas_read_contention;
uint64_t read_repair_attempts = 0;
uint64_t read_repair_repaired_blocking = 0;
uint64_t read_repair_repaired_background = 0;
uint64_t global_read_repairs_canceled_due_to_concurrent_write = 0;
// number of mutations received as a coordinator
uint64_t received_mutations = 0;
// number of counter updates received as a leader
uint64_t received_counter_updates = 0;
// number of forwarded mutations
uint64_t forwarded_mutations = 0;
uint64_t forwarding_errors = 0;
// number of read requests received as a replica
uint64_t replica_data_reads = 0;
uint64_t replica_digest_reads = 0;
uint64_t replica_mutation_data_reads = 0;
uint64_t replica_cross_shard_ops = 0;
utils::timed_rate_moving_average_and_histogram read;
utils::timed_rate_moving_average_and_histogram range;
utils::time_estimated_histogram estimated_read;
utils::time_estimated_histogram estimated_range;
utils::timed_rate_moving_average_and_histogram cas_read;
utils::time_estimated_histogram estimated_cas_read;
uint64_t reads = 0;
uint64_t foreground_reads = 0; // client still waits for the read
uint64_t read_retries = 0; // read is retried with new limit
uint64_t speculative_digest_reads = 0;
uint64_t speculative_data_reads = 0;
uint64_t cas_read_unfinished_commit = 0;
uint64_t cas_foreground = 0;
uint64_t cas_total_running = 0;
uint64_t cas_total_operations = 0;
// Data read attempts
split_stats data_read_attempts;
split_stats data_read_completed;
split_stats data_read_errors;
// Digest read attempts
split_stats digest_read_attempts;
split_stats digest_read_completed;
split_stats digest_read_errors;
// Mutation data read attempts
split_stats mutation_data_read_attempts;
split_stats mutation_data_read_completed;
split_stats mutation_data_read_errors;
public:
stats();
void register_stats();
void register_split_metrics_local();
};
/*** This struct represents stats that has meaning (only or also)
* globally. For example background_write_bytes are used to decide
* if to throttle requests and it make little sense to check it
* per scheduling group, on the other hand this statistic has value
* in figuring out how much load each scheduling group generates
* on the system, this statistic should be handled elsewhere, i.e:
* in the write_stats struct.
*/
struct global_write_stats {
seastar::metrics::metric_groups _metrics;
uint64_t background_write_bytes = 0;
uint64_t queued_write_bytes = 0;
void register_stats();
};
/***
* Following the convention of stats and write_stats
*/
struct global_stats : public global_write_stats {
void register_stats();
};
}
}