Compare commits
24 Commits
copilot/us
...
copilot/pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
173fb1e6d3 | ||
|
|
e252bb1550 | ||
|
|
5713b5efd1 | ||
|
|
979ec5ada8 | ||
|
|
67503a350b | ||
|
|
a90490c3cf | ||
|
|
6f957ea4e0 | ||
|
|
f6605f7b66 | ||
|
|
6cb263bab0 | ||
|
|
035aa90d4b | ||
|
|
40d180a7ef | ||
|
|
9de8d6798e | ||
|
|
a5df2e79a7 | ||
|
|
85d5073234 | ||
|
|
3e4e0c57b8 | ||
|
|
526e5986fe | ||
|
|
bc0952781a | ||
|
|
755d528135 | ||
|
|
d544d8602d | ||
|
|
313985fed7 | ||
|
|
4c4d043a3b | ||
|
|
3b9cd52a95 | ||
|
|
aca5284b13 | ||
|
|
29e0b4e08c |
35
.github/workflows/call_jira_sync.yml
vendored
35
.github/workflows/call_jira_sync.yml
vendored
@@ -1,8 +1,8 @@
|
||||
name: Sync Jira Based on PR Events
|
||||
name: Sync Jira Based on PR Events
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [opened, ready_for_review, review_requested, labeled, unlabeled, closed]
|
||||
types: [opened, edited, ready_for_review, review_requested, labeled, unlabeled, closed]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
@@ -10,32 +10,9 @@ permissions:
|
||||
issues: write
|
||||
|
||||
jobs:
|
||||
jira-sync-pr-opened:
|
||||
if: github.event.action == 'opened'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_opened.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-sync-in-review:
|
||||
if: github.event.action == 'ready_for_review' || github.event.action == 'review_requested'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_in_review.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-sync-add-label:
|
||||
if: github.event.action == 'labeled'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_add_label.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-status-remove-label:
|
||||
if: github.event.action == 'unlabeled'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_remove_label.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-status-pr-closed:
|
||||
if: github.event.action == 'closed'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_closed.yml@main
|
||||
jira-sync:
|
||||
uses: scylladb/github-automation/.github/workflows/main_pr_events_jira_sync.yml@main
|
||||
with:
|
||||
caller_action: ${{ github.event.action }}
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
@@ -3463,7 +3463,11 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
if (should_add_wcu) {
|
||||
rjson::add(ret, "ConsumedCapacity", std::move(consumed_capacity));
|
||||
}
|
||||
_stats.api_operations.batch_write_item_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
auto duration = std::chrono::steady_clock::now() - start_time;
|
||||
_stats.api_operations.batch_write_item_latency.mark(duration);
|
||||
for (const auto& w : per_table_wcu) {
|
||||
w.first->api_operations.batch_write_item_latency.mark(duration);
|
||||
}
|
||||
co_return rjson::print(std::move(ret));
|
||||
}
|
||||
|
||||
@@ -4974,7 +4978,12 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
if (!some_succeeded && eptr) {
|
||||
co_await coroutine::return_exception_ptr(std::move(eptr));
|
||||
}
|
||||
_stats.api_operations.batch_get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
auto duration = std::chrono::steady_clock::now() - start_time;
|
||||
_stats.api_operations.batch_get_item_latency.mark(duration);
|
||||
for (const table_requests& rs : requests) {
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
|
||||
per_table_stats->api_operations.batch_get_item_latency.mark(duration);
|
||||
}
|
||||
if (is_big(response)) {
|
||||
co_return make_streamed(std::move(response));
|
||||
} else {
|
||||
|
||||
@@ -122,9 +122,9 @@ future<> unset_thrift_controller(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_thrift_controller(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
|
||||
return ctx.http_server.set_routes([&ctx, &ss, &group0_client] (routes& r) {
|
||||
set_storage_service(ctx, r, ss, group0_client);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
|
||||
return ctx.http_server.set_routes([&ctx, &ss, &ssc, &group0_client] (routes& r) {
|
||||
set_storage_service(ctx, r, ss, ssc, group0_client);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -98,7 +98,7 @@ future<> set_server_config(http_context& ctx, db::config& cfg);
|
||||
future<> unset_server_config(http_context& ctx);
|
||||
future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snitch);
|
||||
future<> unset_server_snitch(http_context& ctx);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
|
||||
future<> unset_server_storage_service(http_context& ctx);
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
|
||||
future<> unset_server_client_routes(http_context& ctx);
|
||||
|
||||
@@ -835,9 +835,9 @@ rest_force_keyspace_flush(http_context& ctx, std::unique_ptr<http::request> req)
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_decommission(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
rest_decommission(sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, std::unique_ptr<http::request> req) {
|
||||
apilog.info("decommission");
|
||||
return ss.local().decommission().then([] {
|
||||
return ss.local().decommission(ssc).then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
}
|
||||
@@ -1782,7 +1782,7 @@ rest_bind(FuncType func, BindArgs&... args) {
|
||||
return std::bind_front(func, std::ref(args)...);
|
||||
}
|
||||
|
||||
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
|
||||
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
|
||||
ss::get_token_endpoint.set(r, rest_bind(rest_get_token_endpoint, ctx, ss));
|
||||
ss::toppartitions_generic.set(r, rest_bind(rest_toppartitions_generic, ctx));
|
||||
ss::get_release_version.set(r, rest_bind(rest_get_release_version, ss));
|
||||
@@ -1799,7 +1799,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss));
|
||||
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
|
||||
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
|
||||
ss::decommission.set(r, rest_bind(rest_decommission, ss));
|
||||
ss::decommission.set(r, rest_bind(rest_decommission, ss, ssc));
|
||||
ss::move.set(r, rest_bind(rest_move, ss));
|
||||
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
|
||||
ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss));
|
||||
@@ -2141,6 +2141,7 @@ void unset_snapshot(http_context& ctx, routes& r) {
|
||||
ss::start_backup.unset(r);
|
||||
cf::get_true_snapshots_size.unset(r);
|
||||
cf::get_all_true_snapshots_size.unset(r);
|
||||
ss::decommission.unset(r);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ struct scrub_info {
|
||||
|
||||
scrub_info parse_scrub_options(const http_context& ctx, std::unique_ptr<http::request> req);
|
||||
|
||||
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, service::raft_group0_client&);
|
||||
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
|
||||
void unset_storage_service(http_context& ctx, httpd::routes& r);
|
||||
void set_sstables_loader(http_context& ctx, httpd::routes& r, sharded<sstables_loader>& sst_loader);
|
||||
void unset_sstables_loader(http_context& ctx, httpd::routes& r);
|
||||
|
||||
@@ -597,7 +597,6 @@ scylla_tests = set([
|
||||
'test/boost/logalloc_standard_allocator_segment_pool_backend_test',
|
||||
'test/boost/logalloc_test',
|
||||
'test/boost/lru_string_map_test',
|
||||
'test/boost/lru_test',
|
||||
'test/boost/managed_bytes_test',
|
||||
'test/boost/managed_vector_test',
|
||||
'test/boost/map_difference_test',
|
||||
@@ -1584,7 +1583,6 @@ pure_boost_tests = set([
|
||||
'test/boost/like_matcher_test',
|
||||
'test/boost/linearizing_input_stream_test',
|
||||
'test/boost/lru_string_map_test',
|
||||
'test/boost/lru_test',
|
||||
'test/boost/map_difference_test',
|
||||
'test/boost/nonwrapping_interval_test',
|
||||
'test/boost/observable_test',
|
||||
|
||||
@@ -214,7 +214,11 @@ void cache_tracker::clear() {
|
||||
}
|
||||
|
||||
void cache_tracker::touch(rows_entry& e) {
|
||||
_lru.touch(e);
|
||||
// last dummy may not be linked if evicted
|
||||
if (e.is_linked()) {
|
||||
_lru.remove(e);
|
||||
}
|
||||
_lru.add(e);
|
||||
}
|
||||
|
||||
void cache_tracker::insert(cache_entry& entry) {
|
||||
|
||||
@@ -39,10 +39,19 @@ snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::stor
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::stop() {
|
||||
co_await _ops.close();
|
||||
co_await disable_all_operations();
|
||||
co_await _task_manager_module->stop();
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::disable_all_operations() {
|
||||
if (!_ops.is_closed()) {
|
||||
if (_ops.get_count()) {
|
||||
snap_log.info("Waiting for snapshot/backup tasks to finish");
|
||||
}
|
||||
co_await _ops.close();
|
||||
}
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter) {
|
||||
auto& ks = _db.local().find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name), filter = std::move(filter)] (auto& pair) {
|
||||
|
||||
@@ -120,6 +120,8 @@ public:
|
||||
|
||||
future<int64_t> true_snapshots_size();
|
||||
future<int64_t> true_snapshots_size(sstring ks, sstring cf);
|
||||
|
||||
future<> disable_all_operations();
|
||||
private:
|
||||
config _config;
|
||||
sharded<replica::database>& _db;
|
||||
|
||||
@@ -292,8 +292,8 @@ For example::
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
|
||||
or columns provided in a definition of the index.
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
|
||||
See :ref:`WHERE <where-clause>`.
|
||||
|
||||
For example::
|
||||
|
||||
@@ -301,10 +301,6 @@ For example::
|
||||
WHERE user_id = 'user123'
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
The supported operations are equal relations (``=`` and ``IN``) with restrictions as in regular ``WHERE`` clauses. See :ref:`WHERE <where-clause>`.
|
||||
|
||||
Other filtering scenarios are currently not supported.
|
||||
|
||||
.. note::
|
||||
|
||||
Vector indexes are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled.
|
||||
|
||||
@@ -1,111 +1,347 @@
|
||||
# Introduction
|
||||
# Prototype design: auditing all keyspaces and per-role auditing
|
||||
|
||||
Similar to the approach described in CASSANDRA-12151, we add the
|
||||
concept of an audit specification. An audit has a target (syslog or a
|
||||
table) and a set of events/actions that it wants recorded. We
|
||||
introduce new CQL syntax for Scylla users to describe and manipulate
|
||||
audit specifications.
|
||||
## Summary
|
||||
|
||||
Prior art:
|
||||
- Microsoft SQL Server [audit
|
||||
description](https://docs.microsoft.com/en-us/sql/relational-databases/security/auditing/sql-server-audit-database-engine?view=sql-server-ver15)
|
||||
- pgAudit [docs](https://github.com/pgaudit/pgaudit/blob/master/README.md)
|
||||
- MySQL audit_log docs in
|
||||
[MySQL](https://dev.mysql.com/doc/refman/8.0/en/audit-log.html) and
|
||||
[Azure](https://docs.microsoft.com/en-us/azure/mysql/concepts-audit-logs)
|
||||
- DynamoDB can [use CloudTrail](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/logging-using-cloudtrail.html) to log all events
|
||||
Extend the existing `scylla.yaml`-driven audit subsystem with two focused capabilities:
|
||||
|
||||
# CQL extensions
|
||||
1. allow auditing **all keyspaces** without enumerating them one by one
|
||||
2. allow auditing only a configured set of **roles**
|
||||
|
||||
## Create an audit
|
||||
The prototype should stay close to the current implementation in `audit/`:
|
||||
|
||||
```cql
|
||||
CREATE AUDIT [IF NOT EXISTS] audit-name WITH TARGET { SYSLOG | table-name }
|
||||
[ AND TRIGGER KEYSPACE IN (ks1, ks2, ks3) ]
|
||||
[ AND TRIGGER TABLE IN (tbl1, tbl2, tbl3) ]
|
||||
[ AND TRIGGER ROLE IN (usr1, usr2, usr3) ]
|
||||
[ AND TRIGGER CATEGORY IN (cat1, cat2, cat3) ]
|
||||
;
|
||||
- keep the existing backends (`table`, `syslog`, or both)
|
||||
- keep the existing category / keyspace / table filters
|
||||
- preserve live updates for audit configuration
|
||||
- avoid any schema change to `audit.audit_log`
|
||||
|
||||
This is intentionally a small extension of the current auditing model, not a redesign around new CQL statements such as `CREATE AUDIT`.
|
||||
|
||||
## Motivation
|
||||
|
||||
Today Scylla exposes three main audit selectors:
|
||||
|
||||
- `audit_categories`
|
||||
- `audit_tables`
|
||||
- `audit_keyspaces`
|
||||
|
||||
This leaves two operational gaps:
|
||||
|
||||
1. **Auditing all keyspaces is cumbersome.**
|
||||
Large installations may create keyspaces dynamically, or manage many tenant keyspaces. Requiring operators to keep
|
||||
`audit_keyspaces` synchronized with the full keyspace list is error-prone and defeats the point of cluster-wide auditing.
|
||||
2. **Auditing is all-or-nothing with respect to users.**
|
||||
Once a category/keyspace/table combination matches, any authenticated user generating that traffic is audited.
|
||||
Operators want to narrow the scope to specific tenants, service accounts, or privileged roles.
|
||||
|
||||
These two additions also work well together: "audit all keyspaces, but only for selected roles" is a practical way to reduce
|
||||
both audit volume and performance impact.
|
||||
|
||||
## Goals
|
||||
|
||||
- Add a way to express "all keyspaces" in the current configuration model.
|
||||
- Add a new role filter that limits auditing to selected roles.
|
||||
- Preserve backwards compatibility for existing configurations.
|
||||
- Keep the evaluation cheap on the request path.
|
||||
- Support live configuration updates, consistent with the existing audit options.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- Introducing `CREATE AUDIT`, `ALTER AUDIT`, or other new CQL syntax.
|
||||
- Adding per-role audit destinations.
|
||||
- Adding different categories per role.
|
||||
- Expanding role matching through the full granted-role graph in the prototype.
|
||||
- Changing the on-disk audit table schema.
|
||||
|
||||
## Current behavior
|
||||
|
||||
At the moment, audit logging is controlled by:
|
||||
|
||||
- `audit`
|
||||
- `audit_categories`
|
||||
- `audit_tables`
|
||||
- `audit_keyspaces`
|
||||
|
||||
The current decision rule in `audit::should_log()` is effectively:
|
||||
|
||||
```text
|
||||
category matches
|
||||
&& (
|
||||
keyspace is listed in audit_keyspaces
|
||||
|| table is listed in audit_tables
|
||||
|| category in {AUTH, ADMIN, DCL}
|
||||
)
|
||||
```
|
||||
|
||||
From this point on, every database event that matches all present
|
||||
triggers will be recorded in the target. When the target is a table,
|
||||
it behaves like the [current
|
||||
design](https://docs.scylladb.com/operating-scylla/security/auditing/#table-storage).
|
||||
Observations:
|
||||
|
||||
The audit name must be different from all other audits, unless IF NOT
|
||||
EXISTS precedes it, in which case the existing audit must be identical
|
||||
to the new definition. Case sensitivity and length limit are the same
|
||||
as for table names.
|
||||
- `AUTH`, `ADMIN`, and `DCL` are already global once their category is enabled.
|
||||
- `DDL`, `DML`, and `QUERY` need a matching keyspace or table.
|
||||
- An empty `audit_keyspaces` means "audit no keyspaces", not "audit every keyspace".
|
||||
- There is no role-based filter; the authenticated user is recorded in the log but is not part of the decision.
|
||||
- The exact implementation to preserve is in `audit/audit.cc` (`should_log()`, `inspect()`, and `inspect_login()`).
|
||||
|
||||
A trigger kind (ie, `KEYSPACE`, `TABLE`, `ROLE`, or `CATEGORY`) can be
|
||||
specified at most once.
|
||||
## Proposed configuration
|
||||
|
||||
## Show an audit
|
||||
### 1. Add `audit_all_keyspaces`
|
||||
|
||||
```cql
|
||||
DESCRIBE AUDIT [audit-name ...];
|
||||
Introduce a new live-update boolean option:
|
||||
|
||||
Examples:
|
||||
|
||||
```yaml
|
||||
# Audit all keyspaces for matching categories
|
||||
audit_all_keyspaces: true
|
||||
|
||||
# Audit all keyspaces for selected roles
|
||||
audit_all_keyspaces: true
|
||||
audit_roles: "alice,bob"
|
||||
```
|
||||
|
||||
Prints definitions of all audits named herein. If no names are
|
||||
provided, prints all audits.
|
||||
Semantics:
|
||||
|
||||
## Delete an audit
|
||||
- `audit_all_keyspaces: false` keeps the existing behavior.
|
||||
- `audit_all_keyspaces: true` makes every keyspace match.
|
||||
- `audit_keyspaces` keeps its existing meaning: an explicit list of keyspaces, or no keyspace-wide auditing when left empty.
|
||||
- `audit_all_keyspaces: true` and a non-empty `audit_keyspaces` must be rejected as invalid configuration,
|
||||
because the two options express overlapping scope in different ways.
|
||||
- A dedicated boolean is preferable to overloading `audit_keyspaces`, because it avoids changing the meaning of existing configurations.
|
||||
- This also keeps the behavior aligned with today's `audit_tables` handling, where leaving `audit_tables` empty does not introduce a new wildcard syntax.
|
||||
|
||||
```cql
|
||||
DROP AUDIT audit-name;
|
||||
### 2. Add `audit_roles`
|
||||
|
||||
Introduce a new live-update configuration option:
|
||||
|
||||
```yaml
|
||||
audit_roles: "alice,bob,service_api"
|
||||
```
|
||||
|
||||
Stops logging events specified by this audit. Doesn't impact the
|
||||
already logged events. If the target is a table, it remains as it is.
|
||||
Semantics:
|
||||
|
||||
## Alter an audit
|
||||
- empty `audit_roles` means **no role filtering**, preserving today's behavior
|
||||
- non-empty `audit_roles` means audit only requests whose effective logged username matches one of the configured roles
|
||||
- matching is byte-for-byte exact, using the same role name that is already written to the audit record's `username` column / syslog field
|
||||
- the prototype should compare against the post-authentication role name from the session and audit log,
|
||||
with no additional case folding or role-graph expansion
|
||||
|
||||
```cql
|
||||
ALTER AUDIT audit-name WITH {same syntax as CREATE}
|
||||
Examples:
|
||||
|
||||
```yaml
|
||||
# Audit all roles in a single keyspace (current behavior, made explicit)
|
||||
audit_keyspaces: "ks1"
|
||||
audit_roles: ""
|
||||
|
||||
# Audit two roles across all keyspaces
|
||||
audit_all_keyspaces: true
|
||||
audit_roles: "alice,bob"
|
||||
|
||||
# Audit a service role, but only for selected tables
|
||||
audit_tables: "ks1.orders,ks1.payments"
|
||||
audit_roles: "billing_service"
|
||||
```
|
||||
|
||||
Any trigger provided will be updated (or newly created, if previously
|
||||
absent). To drop a trigger, use `IN *`.
|
||||
## Decision rule after the change
|
||||
|
||||
## Permissions
|
||||
After the prototype, the rule becomes:
|
||||
|
||||
Only superusers can modify audits or turn them on and off.
|
||||
```text
|
||||
category matches
|
||||
&& role matches
|
||||
&& (
|
||||
category in {AUTH, ADMIN, DCL}
|
||||
|| audit_all_keyspaces
|
||||
|| keyspace is listed in audit_keyspaces
|
||||
|| table is listed in audit_tables
|
||||
)
|
||||
```
|
||||
|
||||
Only superusers can read tables that are audit targets; no user can
|
||||
modify them. Only superusers can drop tables that are audit targets,
|
||||
after the audit itself is dropped. If a superuser doesn't drop a
|
||||
target table, it remains in existence indefinitely.
|
||||
Where:
|
||||
|
||||
# Implementation
|
||||
- `role matches` is always true when `audit_roles` is empty
|
||||
- `audit_all_keyspaces` is true when the new boolean option is enabled
|
||||
|
||||
## Efficient trigger evaluation
|
||||
For login auditing, the rule is simply:
|
||||
|
||||
```text
|
||||
AUTH category enabled && role matches(login username)
|
||||
```
|
||||
|
||||
## Implementation details
|
||||
|
||||
### Configuration parsing
|
||||
|
||||
Add a new config entry:
|
||||
|
||||
- `db::config::audit_all_keyspaces`
|
||||
- `db::config::audit_roles`
|
||||
|
||||
It should mirror the existing audit selectors:
|
||||
|
||||
- `audit_all_keyspaces`: type `named_value<bool>`, liveness `LiveUpdate`, default `false`
|
||||
- `audit_roles`: type `named_value<sstring>`, liveness `LiveUpdate`, default empty string
|
||||
|
||||
Parsing changes:
|
||||
|
||||
- keep `parse_audit_tables()` as-is
|
||||
- keep `parse_audit_keyspaces()` semantics as-is
|
||||
- add `parse_audit_roles()` that returns a set of role names
|
||||
- normalize empty or whitespace-only keyspace lists to an empty configuration rather than treating them as real keyspace names
|
||||
- add cross-field validation so `audit_all_keyspaces: true` cannot be combined with a non-empty
|
||||
`audit_keyspaces`, both at startup and during live updates
|
||||
|
||||
To avoid re-parsing on every request, the `audit::audit` service should store:
|
||||
|
||||
```c++
|
||||
namespace audit {
|
||||
|
||||
/// Stores triggers from an AUDIT statement.
|
||||
class triggers {
|
||||
// Use trie structures for speedy string lookup.
|
||||
optional<trie> _ks_trigger, _tbl_trigger, _usr_trigger;
|
||||
|
||||
// A logical-AND filter.
|
||||
optional<unsigned> _cat_trigger;
|
||||
|
||||
public:
|
||||
/// True iff every non-null trigger matches the corresponding ainf element.
|
||||
bool should_audit(const audit_info& ainf);
|
||||
};
|
||||
|
||||
} // namespace audit
|
||||
bool _audit_all_keyspaces;
|
||||
std::set<sstring> _audited_keyspaces;
|
||||
std::set<sstring> _audited_roles;
|
||||
```
|
||||
|
||||
To prevent modification of target tables, `audit::inspect()` will
|
||||
check the statement and throw if it is disallowed, similar to what
|
||||
`check_access()` currently does.
|
||||
Using a dedicated boolean keeps the hot-path check straightforward and avoids reinterpreting the existing
|
||||
`_audited_keyspaces` selector.
|
||||
|
||||
## Persisting audit definitions
|
||||
Using `std::set` for the explicit selectors keeps the prototype aligned with the current implementation and minimizes code churn.
|
||||
If profiling later shows lookup cost matters here, the container choice can be revisited independently of the feature semantics.
|
||||
|
||||
Obviously, an audit definition must survive a server restart and stay
|
||||
consistent among all nodes in a cluster. We'll accomplish both by
|
||||
storing audits in a system table.
|
||||
### Audit object changes
|
||||
|
||||
The current `audit_info` already carries:
|
||||
|
||||
- category
|
||||
- keyspace
|
||||
- table
|
||||
- query text
|
||||
|
||||
The username is available separately from `service::query_state` and is already passed to storage helpers when an entry is written.
|
||||
For the prototype there is no need to duplicate the username into `audit_info`.
|
||||
|
||||
Instead:
|
||||
|
||||
- change `should_log()` to take the effective username as an additional input
|
||||
- change `should_log_login()` to check the username against `audit_roles`
|
||||
- keep the storage helpers unchanged, because they already persist the username
|
||||
- update the existing internal call sites in `inspect()` and `inspect_login()` to pass the username through
|
||||
|
||||
One possible interface shape is:
|
||||
|
||||
```c++
|
||||
bool should_log(std::string_view username, const audit_info* info) const;
|
||||
bool should_log_login(std::string_view username) const;
|
||||
```
|
||||
|
||||
### Role semantics
|
||||
|
||||
For the prototype, "role" means the role name already associated with the current client session:
|
||||
|
||||
- successful authenticated sessions use the session's user name
|
||||
- failed login events use the login name from the authentication attempt
|
||||
- failed login events are still subject to `audit_roles`, matched against the attempted login name
|
||||
|
||||
This keeps the feature easy to explain and aligns the filter with what users already see in audit output.
|
||||
|
||||
The prototype should **not** try to expand inherited roles. If a user logs in as `alice` and inherits permissions from another role,
|
||||
the audit filter still matches `alice`. This keeps the behavior deterministic and avoids expensive role graph lookups on the request path.
|
||||
|
||||
### Keyspace semantics
|
||||
|
||||
`audit_all_keyspaces: true` should affect any statement whose `audit_info` carries a keyspace name.
|
||||
|
||||
Important consequences:
|
||||
|
||||
- it makes `DDL` / `DML` / `QUERY` auditing effectively cluster-wide
|
||||
- it does not change the existing global handling of `AUTH`, `ADMIN`, and `DCL`
|
||||
- statements that naturally have no keyspace name continue to depend on their category-specific behavior
|
||||
|
||||
No extra schema or metadata scan is required: the request already carries the keyspace information needed for the decision.
|
||||
|
||||
## Backwards compatibility
|
||||
|
||||
This design keeps existing behavior intact:
|
||||
|
||||
- existing clusters that do not set `audit_roles` continue to audit all roles
|
||||
- existing clusters that leave `audit_keyspaces` empty continue to audit no keyspaces
|
||||
- existing explicit keyspace/table lists keep their current meaning
|
||||
|
||||
The feature is enabled only by a new explicit boolean, so existing `audit_keyspaces` values do not need to be reinterpreted.
|
||||
The only newly-invalid combination is enabling `audit_all_keyspaces` while also listing explicit keyspaces.
|
||||
|
||||
## Operational considerations
|
||||
|
||||
### Performance and volume
|
||||
|
||||
`audit_all_keyspaces: true` can significantly increase audit volume, especially with `QUERY` and `DML`.
|
||||
|
||||
The intended mitigation is to combine it with:
|
||||
|
||||
- a narrow `audit_categories`
|
||||
- a narrow `audit_roles`
|
||||
|
||||
That combination gives operators a simple and cheap filter model:
|
||||
|
||||
- first by category
|
||||
- then by role
|
||||
- then by keyspace/table scope
|
||||
|
||||
### Live updates
|
||||
|
||||
`audit_roles` should follow the same live-update behavior as the current audit filters.
|
||||
|
||||
Changing:
|
||||
|
||||
- `audit_roles`
|
||||
- `audit_all_keyspaces`
|
||||
- `audit_keyspaces`
|
||||
- `audit_tables`
|
||||
- `audit_categories`
|
||||
|
||||
should update the in-memory selectors on all shards without restarting the node.
|
||||
|
||||
### Prototype limitation
|
||||
|
||||
Because matching is done against the authenticated session role name, `audit_roles` cannot express "audit everyone who inherits role X".
|
||||
Operators must list the concrete login roles they want to audit. This is a deliberate trade-off in the prototype to keep matching cheap
|
||||
and avoid role graph lookups on every audited request.
|
||||
|
||||
Example: if `alice` inherits permissions from `admin_role`, configuring `audit_roles: "admin_role"` would not audit requests from
|
||||
`alice`; to audit those requests, `alice` itself must be listed.
|
||||
|
||||
### Audit table schema
|
||||
|
||||
No schema change is needed. The audit table already includes `username`, which is sufficient for both storage and later analysis.
|
||||
|
||||
## Testing plan
|
||||
|
||||
The prototype should extend existing audit coverage rather than introduce a separate test framework.
|
||||
|
||||
### Parser / unit coverage
|
||||
|
||||
Add focused tests for:
|
||||
|
||||
- empty `audit_roles`
|
||||
- specific `audit_roles`
|
||||
- `audit_all_keyspaces: true`
|
||||
- invalid mixed configuration: `audit_all_keyspaces: true` with non-empty `audit_keyspaces`
|
||||
- empty or whitespace-only keyspace lists such as `",,,"` or `" "`, which should normalize to an empty configuration and therefore audit no keyspaces
|
||||
- boolean config parsing for `audit_all_keyspaces`
|
||||
|
||||
### Behavioral coverage
|
||||
|
||||
Extend the existing audit tests in `test/cluster/dtest/audit_test.py` with scenarios such as:
|
||||
|
||||
1. `audit_all_keyspaces: true` audits statements in multiple keyspaces without listing them explicitly
|
||||
2. `audit_roles: "alice"` logs requests from `alice` but not from `bob`
|
||||
3. `audit_all_keyspaces: true` + `audit_roles: "alice"` only logs `alice`'s traffic cluster-wide
|
||||
4. login auditing respects `audit_roles`
|
||||
5. live-updating `audit_roles` changes behavior without restart
|
||||
6. setting `audit_all_keyspaces: true` together with explicit `audit_keyspaces` is rejected with a clear error
|
||||
|
||||
## Future evolution
|
||||
|
||||
This prototype is deliberately small, but it fits a broader audit-spec design if we decide to revisit that later.
|
||||
|
||||
In a future CQL-driven design, these two additions map naturally to triggers such as:
|
||||
|
||||
- `TRIGGER KEYSPACE IN *`
|
||||
- `TRIGGER ROLE IN (...)`
|
||||
|
||||
That means the prototype is not throwaway work: it improves the current operational model immediately while keeping a clean path
|
||||
toward richer audit objects in the future.
|
||||
|
||||
@@ -10,27 +10,7 @@ Cache is always paired with its underlying mutation source which it mirrors. Tha
|
||||
|
||||
Eviction is about removing parts of the data from memory and recording the fact that information about those parts is missing. Eviction doesn't change the set of writes represented by cache as part of its `mutation_source` interface.
|
||||
|
||||
The smallest object which can be evicted, called eviction unit, is currently a single row (`rows_entry`). Eviction units are managed by a W-TinyLFU policy owned by a `cache_tracker`. The W-TinyLFU policy determines eviction order. It is shared among many tables. Currently, there is one per `database`.
|
||||
|
||||
### W-TinyLFU Eviction Policy
|
||||
|
||||
The cache uses a W-TinyLFU (Window Tiny Least Frequently Used) eviction policy,
|
||||
which combines recency and frequency information for better hit rates than plain LRU.
|
||||
|
||||
The policy organizes entries into three segments:
|
||||
|
||||
- **Window** (~1% of cache): A small LRU that admits all new entries. This allows
|
||||
new entries to build up frequency information before competing for main cache space.
|
||||
- **Probation** (~19% of cache): Part of the main SLRU cache. Entries from the window
|
||||
compete with probation victims for admission using a TinyLFU frequency filter.
|
||||
- **Protected** (~80% of cache): The other part of the main SLRU cache. Entries are
|
||||
promoted here from probation when accessed again.
|
||||
|
||||
The TinyLFU frequency filter uses a Count-Min Sketch to compactly estimate access
|
||||
frequency. When eviction is needed, the window victim competes with the probation
|
||||
victim: the entry with higher estimated frequency survives in probation while the
|
||||
other is evicted. The sketch is periodically aged (all counts halved) to adapt to
|
||||
changing access patterns.
|
||||
The smallest object which can be evicted, called eviction unit, is currently a single row (`rows_entry`). Eviction units are linked in an LRU owned by a `cache_tracker`. The LRU determines eviction order. The LRU is shared among many tables. Currently, there is one per `database`.
|
||||
|
||||
All `rows_entry` objects which are owned by a `cache_tracker` are assumed to be either contained in a cache (in some `row_cache::partitions_type`) or
|
||||
be owned by a (detached) `partition_snapshot`. When the last row from a `partition_entry` is evicted, the containing `cache_entry` is evicted from the cache.
|
||||
|
||||
2
main.cc
2
main.cc
@@ -2236,7 +2236,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
return m.start();
|
||||
}).get();
|
||||
|
||||
api::set_server_storage_service(ctx, ss, group0_client).get();
|
||||
api::set_server_storage_service(ctx, ss, snapshot_ctl, group0_client).get();
|
||||
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
|
||||
api::unset_server_storage_service(ctx).get();
|
||||
});
|
||||
|
||||
@@ -371,7 +371,7 @@ public:
|
||||
}
|
||||
|
||||
void on_preemptive_aborted() {
|
||||
if (_state != reader_permit::state::waiting_for_admission && _state != reader_permit::state::waiting_for_memory) {
|
||||
if (_state != reader_permit::state::waiting_for_admission) {
|
||||
on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state));
|
||||
}
|
||||
|
||||
@@ -1533,19 +1533,24 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
// + permit.timeout() < db::no_timeout -- to avoid preemptively aborting reads without timeout.
|
||||
// Useful is tests when _preemptive_abort_factor is set to 1.0
|
||||
// to avoid additional sleeps to wait for the read to be shed.
|
||||
const auto time_budget = permit.timeout() - permit.created();
|
||||
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
|
||||
if (remaining_time > db::timeout_clock::duration::zero() &&
|
||||
permit.timeout() < db::no_timeout &&
|
||||
remaining_time <= _preemptive_abort_factor() * time_budget) {
|
||||
permit.on_preemptive_aborted();
|
||||
using ms = std::chrono::milliseconds;
|
||||
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
|
||||
_name,
|
||||
std::chrono::duration_cast<ms>(time_budget - remaining_time),
|
||||
std::chrono::duration_cast<ms>(time_budget),
|
||||
_preemptive_abort_factor());
|
||||
continue;
|
||||
//
|
||||
// Only apply to permits waiting for admission -- permits waiting for memory are already
|
||||
// executing reads and should not be preemptively aborted.
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_admission) {
|
||||
const auto time_budget = permit.timeout() - permit.created();
|
||||
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
|
||||
if (remaining_time > db::timeout_clock::duration::zero() &&
|
||||
permit.timeout() < db::no_timeout &&
|
||||
remaining_time <= _preemptive_abort_factor() * time_budget) {
|
||||
permit.on_preemptive_aborted();
|
||||
using ms = std::chrono::milliseconds;
|
||||
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
|
||||
_name,
|
||||
std::chrono::duration_cast<ms>(time_budget - remaining_time),
|
||||
std::chrono::duration_cast<ms>(time_budget),
|
||||
_preemptive_abort_factor());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
|
||||
@@ -2362,6 +2362,15 @@ static future<> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
|
||||
}
|
||||
}
|
||||
|
||||
static future<repair_rows_on_wire> clone_gently(const repair_rows_on_wire& rows) {
|
||||
repair_rows_on_wire cloned;
|
||||
for (const auto& row : rows) {
|
||||
cloned.push_back(row);
|
||||
co_await seastar::coroutine::maybe_yield();
|
||||
}
|
||||
co_return cloned;
|
||||
}
|
||||
|
||||
static future<> repair_put_row_diff_with_rpc_stream_process_op(
|
||||
sharded<repair_service>& repair,
|
||||
locator::host_id from,
|
||||
@@ -2388,7 +2397,9 @@ static future<> repair_put_row_diff_with_rpc_stream_process_op(
|
||||
co_await rm->put_row_diff_handler(std::move(*fp));
|
||||
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
|
||||
} else {
|
||||
co_await rm->put_row_diff_handler(*fp);
|
||||
// Gently clone to avoid copy stall on destination shard
|
||||
repair_rows_on_wire local_rows = co_await clone_gently(*fp);
|
||||
co_await seastar::when_all_succeed(rm->put_row_diff_handler(std::move(local_rows)), utils::clear_gently(fp));
|
||||
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -2794,12 +2794,18 @@ future<> storage_service::raft_decommission() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::decommission() {
|
||||
return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) {
|
||||
return seastar::async([&ss] {
|
||||
future<> storage_service::decommission(sharded<db::snapshot_ctl>& snapshot_ctl) {
|
||||
return run_with_api_lock(sstring("decommission"), [&] (storage_service& ss) {
|
||||
return seastar::async([&] {
|
||||
if (ss._operation_mode != mode::NORMAL) {
|
||||
throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode));
|
||||
}
|
||||
|
||||
snapshot_ctl.invoke_on_all([](auto& sctl) {
|
||||
return sctl.disable_all_operations();
|
||||
}).get();
|
||||
slogger.info("DECOMMISSIONING: disabled backup and snapshots");
|
||||
|
||||
ss.raft_decommission().get();
|
||||
|
||||
ss.stop_transport().get();
|
||||
|
||||
@@ -77,6 +77,7 @@ namespace db {
|
||||
class system_distributed_keyspace;
|
||||
class system_keyspace;
|
||||
class batchlog_manager;
|
||||
class snapshot_ctl;
|
||||
namespace view {
|
||||
class view_builder;
|
||||
class view_building_worker;
|
||||
@@ -666,7 +667,7 @@ private:
|
||||
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const schema_ptr& schema, const replica::column_family& cf, const partition_key& pk) const;
|
||||
|
||||
public:
|
||||
future<> decommission();
|
||||
future<> decommission(sharded<db::snapshot_ctl>&);
|
||||
|
||||
private:
|
||||
future<> unbootstrap();
|
||||
|
||||
@@ -471,14 +471,17 @@ def test_streams_operations(test_table_s, dynamodbstreams, metrics):
|
||||
# to update latencies for one kind of operation (#17616, and compare #9406),
|
||||
# and to do that checking that ..._count increases for that op is enough.
|
||||
@contextmanager
|
||||
def check_sets_latency(metrics, operation_names):
|
||||
def check_sets_latency_by_metric(metrics, operation_names, metric_name):
|
||||
the_metrics = get_metrics(metrics)
|
||||
saved_latency_count = { x: get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': x}, the_metrics) for x in operation_names }
|
||||
saved_latency_count = { x: get_metric(metrics, f'{metric_name}_count', {'op': x}, the_metrics) for x in operation_names }
|
||||
yield
|
||||
the_metrics = get_metrics(metrics)
|
||||
for op in operation_names:
|
||||
# The total "count" on all shards should strictly increase
|
||||
assert saved_latency_count[op] < get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': op}, the_metrics)
|
||||
assert saved_latency_count[op] < get_metric(metrics, f'{metric_name}_count', {'op': op}, the_metrics)
|
||||
|
||||
def check_sets_latency(metrics, operation_names):
|
||||
return check_sets_latency_by_metric(metrics, operation_names, 'scylla_alternator_op_latency')
|
||||
|
||||
# Test latency metrics for PutItem, GetItem, DeleteItem, UpdateItem.
|
||||
# We can't check what exactly the latency is - just that it gets updated.
|
||||
@@ -494,6 +497,18 @@ def test_item_latency(test_table_s, metrics):
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
def test_item_latency_per_table(test_table_s, metrics):
|
||||
with check_sets_latency_by_metric(metrics, ['DeleteItem', 'GetItem', 'PutItem', 'UpdateItem', 'BatchWriteItem', 'BatchGetItem'], 'scylla_alternator_table_op_latency'):
|
||||
p = random_string()
|
||||
test_table_s.put_item(Item={'p': p})
|
||||
test_table_s.get_item(Key={'p': p})
|
||||
test_table_s.delete_item(Key={'p': p})
|
||||
test_table_s.update_item(Key={'p': p})
|
||||
test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
# Test latency metrics for GetRecords. Other Streams-related operations -
|
||||
# ListStreams, DescribeStream, and GetShardIterator, have an operation
|
||||
# count (tested above) but do NOT currently have a latency histogram.
|
||||
|
||||
@@ -1,287 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_MODULE lru
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
|
||||
#include "utils/count_min_sketch.hh"
|
||||
#include "utils/lru.hh"
|
||||
|
||||
// A concrete evictable for testing.
|
||||
struct test_evictable final : public evictable {
|
||||
int id;
|
||||
bool was_evicted = false;
|
||||
|
||||
explicit test_evictable(int id) : id(id) {}
|
||||
|
||||
void on_evicted() noexcept override {
|
||||
was_evicted = true;
|
||||
}
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Count-Min Sketch Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// Width = 2^test_sketch_width_log2 = 1024 counters per row.
|
||||
static constexpr size_t test_sketch_width_log2 = 10;
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_count_min_sketch_basic) {
|
||||
utils::count_min_sketch sketch(test_sketch_width_log2);
|
||||
|
||||
// An unseen key should have estimate 0.
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(42), 0);
|
||||
|
||||
sketch.increment(42);
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(42), 1);
|
||||
|
||||
sketch.increment(42);
|
||||
sketch.increment(42);
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(42), 3);
|
||||
|
||||
// A different key should be independent.
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(100), 0);
|
||||
sketch.increment(100);
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(100), 1);
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(42), 3);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_count_min_sketch_max_counter) {
|
||||
utils::count_min_sketch sketch(test_sketch_width_log2);
|
||||
|
||||
for (int i = 0; i < 20; ++i) {
|
||||
sketch.increment(1);
|
||||
}
|
||||
// 4-bit counter caps at 15.
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 15);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_count_min_sketch_decay) {
|
||||
utils::count_min_sketch sketch(test_sketch_width_log2);
|
||||
|
||||
sketch.increment(1);
|
||||
sketch.increment(1);
|
||||
sketch.increment(1);
|
||||
sketch.increment(1); // freq = 4
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 4);
|
||||
|
||||
sketch.decay(); // halve → 2
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 2);
|
||||
|
||||
sketch.decay(); // halve → 1
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 1);
|
||||
|
||||
sketch.decay(); // halve → 0
|
||||
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 0);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// W-TinyLFU LRU Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_lru_add_and_evict) {
|
||||
lru l;
|
||||
test_evictable e1(1), e2(2), e3(3);
|
||||
|
||||
l.add(e1);
|
||||
l.add(e2);
|
||||
l.add(e3);
|
||||
|
||||
BOOST_REQUIRE(e1.is_linked());
|
||||
BOOST_REQUIRE(e2.is_linked());
|
||||
BOOST_REQUIRE(e3.is_linked());
|
||||
|
||||
// Evict removes at least one entry.
|
||||
auto r = l.evict();
|
||||
BOOST_REQUIRE(r == seastar::memory::reclaiming_result::reclaimed_something);
|
||||
|
||||
// At least one entry should have been evicted.
|
||||
int evicted_count = (e1.was_evicted ? 1 : 0) + (e2.was_evicted ? 1 : 0) + (e3.was_evicted ? 1 : 0);
|
||||
BOOST_REQUIRE_GE(evicted_count, 1);
|
||||
|
||||
// Clean up remaining linked entries.
|
||||
if (e1.is_linked()) l.remove(e1);
|
||||
if (e2.is_linked()) l.remove(e2);
|
||||
if (e3.is_linked()) l.remove(e3);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_lru_evict_empty) {
|
||||
lru l;
|
||||
auto r = l.evict();
|
||||
BOOST_REQUIRE(r == seastar::memory::reclaiming_result::reclaimed_nothing);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_lru_touch_keeps_entry_alive) {
|
||||
lru l;
|
||||
|
||||
// Create entries with different access patterns.
|
||||
test_evictable hot(1), cold1(2), cold2(3);
|
||||
|
||||
l.add(hot);
|
||||
l.add(cold1);
|
||||
l.add(cold2);
|
||||
|
||||
// Touch 'hot' many times to build frequency.
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
l.touch(hot);
|
||||
}
|
||||
|
||||
// Evict all - the hot entry may survive longer than cold entries.
|
||||
l.evict();
|
||||
l.evict();
|
||||
|
||||
// Hot entry should still be linked (survived eviction of cold entries).
|
||||
BOOST_REQUIRE(hot.is_linked());
|
||||
|
||||
// Clean up.
|
||||
l.remove(hot);
|
||||
// cold entries may or may not still be linked, clean up if needed.
|
||||
if (cold1.is_linked()) l.remove(cold1);
|
||||
if (cold2.is_linked()) l.remove(cold2);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_lru_evict_all) {
|
||||
lru l;
|
||||
test_evictable e1(1), e2(2), e3(3);
|
||||
|
||||
l.add(e1);
|
||||
l.add(e2);
|
||||
l.add(e3);
|
||||
|
||||
l.evict_all();
|
||||
|
||||
BOOST_REQUIRE(!e1.is_linked());
|
||||
BOOST_REQUIRE(!e2.is_linked());
|
||||
BOOST_REQUIRE(!e3.is_linked());
|
||||
BOOST_REQUIRE(e1.was_evicted);
|
||||
BOOST_REQUIRE(e2.was_evicted);
|
||||
BOOST_REQUIRE(e3.was_evicted);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_lru_remove) {
|
||||
lru l;
|
||||
test_evictable e1(1), e2(2), e3(3);
|
||||
|
||||
l.add(e1);
|
||||
l.add(e2);
|
||||
l.add(e3);
|
||||
|
||||
l.remove(e2);
|
||||
BOOST_REQUIRE(!e2.is_linked());
|
||||
BOOST_REQUIRE(!e2.was_evicted); // remove does not call on_evicted
|
||||
|
||||
l.evict_all();
|
||||
BOOST_REQUIRE(e1.was_evicted);
|
||||
BOOST_REQUIRE(e3.was_evicted);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_lru_add_before) {
|
||||
lru l;
|
||||
test_evictable e1(1), e2(2), e3(3);
|
||||
|
||||
l.add(e1);
|
||||
l.add(e2);
|
||||
|
||||
// Insert e3 before e2 so e3 is evicted before e2.
|
||||
l.add_before(e2, e3);
|
||||
|
||||
BOOST_REQUIRE(e1.is_linked());
|
||||
BOOST_REQUIRE(e2.is_linked());
|
||||
BOOST_REQUIRE(e3.is_linked());
|
||||
|
||||
// Clean up.
|
||||
l.evict_all();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_lru_frequency_based_eviction) {
|
||||
lru l;
|
||||
|
||||
// Create entries with different access patterns.
|
||||
// Use a fixed-size array to avoid move construction issues.
|
||||
static constexpr int N = 20;
|
||||
std::unique_ptr<test_evictable> entries[N];
|
||||
for (int i = 0; i < N; ++i) {
|
||||
entries[i] = std::make_unique<test_evictable>(i);
|
||||
}
|
||||
|
||||
for (int i = 0; i < N; ++i) {
|
||||
l.add(*entries[i]);
|
||||
}
|
||||
|
||||
// Touch entries 15-19 many times (they should be "hot").
|
||||
for (int round = 0; round < 10; ++round) {
|
||||
for (int i = 15; i < N; ++i) {
|
||||
l.touch(*entries[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Evict half the entries.
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
l.evict();
|
||||
}
|
||||
|
||||
// Hot entries (15-19) should still be linked.
|
||||
for (int i = 15; i < N; ++i) {
|
||||
BOOST_REQUIRE_MESSAGE(entries[i]->is_linked(),
|
||||
"Hot entry " << i << " should survive eviction");
|
||||
}
|
||||
|
||||
// Clean up remaining entries.
|
||||
for (int i = 0; i < N; ++i) {
|
||||
if (entries[i]->is_linked()) {
|
||||
l.remove(*entries[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_lru_touch_promotes_from_probation) {
|
||||
lru l;
|
||||
|
||||
// Create entries.
|
||||
static constexpr int N = 10;
|
||||
std::unique_ptr<test_evictable> entries[N];
|
||||
for (int i = 0; i < N; ++i) {
|
||||
entries[i] = std::make_unique<test_evictable>(i);
|
||||
}
|
||||
for (int i = 0; i < N; ++i) {
|
||||
l.add(*entries[i]);
|
||||
}
|
||||
|
||||
// Evict and re-add some to force entries into probation via the eviction logic.
|
||||
// The eviction drains excess from window to probation.
|
||||
// After enough evictions, remaining entries should be in probation or protected.
|
||||
|
||||
// Touch entries 0-4 multiple times to build frequency.
|
||||
for (int round = 0; round < 5; ++round) {
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
l.touch(*entries[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Evict 5 entries - cold entries (5-9) should be evicted.
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
l.evict();
|
||||
}
|
||||
|
||||
// Entries 0-4 (frequently touched) should survive.
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
BOOST_REQUIRE_MESSAGE(entries[i]->is_linked(),
|
||||
"Frequently touched entry " << i << " should survive eviction");
|
||||
}
|
||||
|
||||
// Clean up.
|
||||
for (int i = 0; i < N; ++i) {
|
||||
if (entries[i]->is_linked()) {
|
||||
l.remove(*entries[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2409,6 +2409,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort)
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abort_preemptively_aborted_permit) {
|
||||
simple_schema s;
|
||||
const auto schema = s.schema();
|
||||
const std::string test_name = get_name();
|
||||
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
const auto serialize_multiplier = 2;
|
||||
// Ensure permits are shed immediately during admission.
|
||||
@@ -2421,27 +2425,24 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abort_preemptively_ab
|
||||
|
||||
// Set a ridiculously long timeout to ensure permit will not be rejected due to timeout
|
||||
auto timeout = db::timeout_clock::now() + 60min;
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, timeout, {}).get();
|
||||
auto permit1 = semaphore.obtain_permit(schema, test_name, 1024, db::no_timeout, {}).get();
|
||||
auto units1 = permit1.request_memory(2024).get();
|
||||
|
||||
auto permit2_units1 = permit2.request_memory(1024).get();
|
||||
auto permit1_units = permit1.request_memory(8 * 1024).get();
|
||||
reader_permit_opt permit2_holder;
|
||||
auto permit2_fut = semaphore.with_permit(schema, test_name.c_str(), 1024, timeout, {}, permit2_holder, [] (reader_permit) {
|
||||
BOOST_FAIL("unexpected call to with permit lambda");
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
// permit1 is now the blessed one
|
||||
// Triggers maybe_admit_waiters()
|
||||
units1.reset_to_zero();
|
||||
|
||||
auto permit2_units2_fut = permit2.request_memory(1024);
|
||||
BOOST_REQUIRE(!permit2_units2_fut.available());
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 1);
|
||||
BOOST_REQUIRE(eventually_true([&] { return permit2_fut.failed(); }));
|
||||
BOOST_REQUIRE_THROW(permit2_fut.get(), named_semaphore_aborted);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().total_reads_shed_due_to_overload, 1);
|
||||
|
||||
permit1_units.reset_to_zero();
|
||||
|
||||
const auto futures_failed = eventually_true([&] { return permit2_units2_fut.failed(); });
|
||||
BOOST_CHECK(futures_failed);
|
||||
BOOST_CHECK_EQUAL(semaphore.get_stats().total_reads_shed_due_to_overload, 1);
|
||||
|
||||
simple_schema ss;
|
||||
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(ss.schema(), permit2));
|
||||
BOOST_REQUIRE_THROW(permit2_units2_fut.get(), named_semaphore_aborted);
|
||||
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(schema, *permit2_holder));
|
||||
BOOST_CHECK(!irh);
|
||||
}
|
||||
|
||||
/// Test that if no count resources are currently used, a single permit is always admitted regardless of available memory.
|
||||
@@ -2555,4 +2556,43 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_release_base_resource
|
||||
}
|
||||
}
|
||||
|
||||
// Reproducer for https://scylladb.atlassian.net/browse/SCYLLADB-1016
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_preemptive_abort_requested_memory_leak) {
|
||||
const ssize_t memory = 1024;
|
||||
const uint32_t serialize_limit_multiplier = 2;
|
||||
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(),
|
||||
2, // count
|
||||
memory,
|
||||
100, // max queue length
|
||||
utils::updateable_value(serialize_limit_multiplier),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()), // kill limit multiplier
|
||||
utils::updateable_value<uint32_t>(1), // cpu concurrency
|
||||
utils::updateable_value<float>(1.0f)); // preemptive abort factor
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, "permit1", memory/2, db::no_timeout, {}).get();
|
||||
reader_permit_opt permit2 = semaphore.obtain_permit(nullptr, "permit2", memory/2, db::timeout_clock::now() + 60s, {}).get();
|
||||
|
||||
auto units1 = permit1.request_memory(memory * serialize_limit_multiplier).get();
|
||||
auto mem_fut = permit2->request_memory(1024);
|
||||
BOOST_REQUIRE(!mem_fut.available());
|
||||
|
||||
// Triggers maybe_admit_waiters()
|
||||
units1.reset_to_zero();
|
||||
|
||||
// Consume mem_fut to properly account for the 1024 bytes consumed by
|
||||
// on_granted_memory(). In debug mode, the .then() continuation that creates
|
||||
// the resource_units may be deferred due to yielding, so we must .get() it
|
||||
// to ensure the resource_units is created and can be properly destroyed.
|
||||
{ auto u = mem_fut.get(); }
|
||||
|
||||
// on_granted_memory() consumes stale _requested_memory (1024) + 512,
|
||||
// but resource_units only tracks 512 — the difference leaks.
|
||||
{ auto u = permit2->request_memory(512).get(); }
|
||||
|
||||
// Shouldn't fail if SCYLLADB-1016 is fixed.
|
||||
permit2 = {};
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -61,7 +61,7 @@ async def test_startup_no_auth_response(manager: ManagerClient, build_mode):
|
||||
session = c.connect()
|
||||
logging.info("Performing SELECT(*) FROM system.clients")
|
||||
res = session.execute("SELECT COUNT(*) FROM system.clients WHERE connection_stage = 'AUTHENTICATING' ALLOW FILTERING;")
|
||||
count = res[0][0]
|
||||
count = res.one()[0]
|
||||
logging.info(f"Observed {count} AUTHENTICATING connections...")
|
||||
if count >= num_connections/2:
|
||||
connections_observed = True
|
||||
|
||||
@@ -11,7 +11,7 @@ import pytest
|
||||
import time
|
||||
import random
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.manager_client import ManagerClient, ServerInfo
|
||||
from test.cluster.object_store.conftest import format_tuples
|
||||
from test.cluster.util import wait_for_cql_and_get_hosts, get_replication, new_test_keyspace
|
||||
from test.pylib.rest_client import read_barrier
|
||||
@@ -19,6 +19,7 @@ from test.pylib.util import unique_name, wait_all
|
||||
from cassandra.cluster import ConsistencyLevel
|
||||
from collections import defaultdict
|
||||
from test.pylib.util import wait_for
|
||||
from test.pylib.rest_client import HTTPError
|
||||
import statistics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -174,9 +175,8 @@ async def test_backup_endpoint_config_is_live_updateable(manager: ManagerClient,
|
||||
assert status is not None
|
||||
assert status['state'] == 'done'
|
||||
|
||||
|
||||
async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
breakpoint_name, min_files, max_files = None):
|
||||
async def do_test_backup_helper(manager: ManagerClient, object_storage,
|
||||
breakpoint_name, handler, num_servers: int = 1):
|
||||
'''helper for backup abort testing'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
@@ -186,10 +186,9 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
server = (await manager.servers_add(num_servers, config=cfg, cmdline=cmd))[0]
|
||||
ks, cf = create_ks_and_cf(manager.get_cql())
|
||||
snap_name, files = await take_snapshot_on_one_server(ks, server, manager, logger)
|
||||
assert len(files) > 1
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
|
||||
@@ -206,26 +205,35 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
|
||||
print(f'Started task {tid}, aborting it early')
|
||||
await log.wait_for(breakpoint_name + ': waiting', from_mark=mark)
|
||||
await manager.api.abort_task(server.ip_addr, tid)
|
||||
await manager.api.message_injection(server.ip_addr, breakpoint_name)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
print(f'Status: {status}')
|
||||
assert (status is not None) and (status['state'] == 'failed')
|
||||
assert "seastar::abort_requested_exception (abort requested)" in status['error']
|
||||
await handler(server, prefix, files, tid)
|
||||
|
||||
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
|
||||
uploaded_count = 0
|
||||
for f in files:
|
||||
in_backup = f'{prefix}/{f}' in objects
|
||||
print(f'Check {f} is in backup: {in_backup}')
|
||||
if in_backup:
|
||||
uploaded_count += 1
|
||||
# Note: since s3 client is abortable and run async, we might fail even the first file
|
||||
# regardless of if we set the abort status before or after the upload is initiated.
|
||||
# Parallelism is a pain.
|
||||
assert min_files <= uploaded_count < len(files)
|
||||
assert max_files is None or uploaded_count < max_files
|
||||
async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
breakpoint_name, min_files, max_files = None):
|
||||
'''helper for backup abort testing'''
|
||||
|
||||
async def abort_and_check(server, prefix, files, tid):
|
||||
assert len(files) > 1
|
||||
await manager.api.abort_task(server.ip_addr, tid)
|
||||
await manager.api.message_injection(server.ip_addr, breakpoint_name)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
print(f'Status: {status}')
|
||||
assert (status is not None) and (status['state'] == 'failed')
|
||||
assert "seastar::abort_requested_exception (abort requested)" in status['error']
|
||||
|
||||
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
|
||||
uploaded_count = 0
|
||||
for f in files:
|
||||
in_backup = f'{prefix}/{f}' in objects
|
||||
print(f'Check {f} is in backup: {in_backup}')
|
||||
if in_backup:
|
||||
uploaded_count += 1
|
||||
# Note: since s3 client is abortable and run async, we might fail even the first file
|
||||
# regardless of if we set the abort status before or after the upload is initiated.
|
||||
# Parallelism is a pain.
|
||||
assert min_files <= uploaded_count < len(files)
|
||||
assert max_files is None or uploaded_count < max_files
|
||||
|
||||
await do_test_backup_helper(manager, object_storage, breakpoint_name, abort_and_check)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
@@ -456,7 +464,8 @@ class topo:
|
||||
self.racks = racks
|
||||
self.dcs = dcs
|
||||
|
||||
async def create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, object_storage=None):
|
||||
async def create_cluster(topology, manager, logger, object_storage=None):
|
||||
rf_rack_valid_keyspaces = (topology.rf <= topology.racks)
|
||||
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks, rf_rack_valid_keyspaces: {rf_rack_valid_keyspaces}')
|
||||
|
||||
cfg = {'task_ttl_in_seconds': 300, 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces}
|
||||
@@ -679,19 +688,19 @@ class SSTablesOnObjectStorage:
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("topology_rf_validity", [
|
||||
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
|
||||
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
|
||||
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
|
||||
@pytest.mark.parametrize("topology", [
|
||||
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
|
||||
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
|
||||
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
|
||||
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
|
||||
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
|
||||
])
|
||||
async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerClient, object_storage, topology_rf_validity):
|
||||
async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerClient, object_storage, topology):
|
||||
'''Check that restoring of a cluster with stream scopes works'''
|
||||
await do_test_streaming_scopes(build_mode, manager, topology_rf_validity, SSTablesOnObjectStorage(object_storage))
|
||||
await do_test_streaming_scopes(build_mode, manager, topology, SSTablesOnObjectStorage(object_storage))
|
||||
|
||||
|
||||
async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity, sstables_storage):
|
||||
async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topology, sstables_storage):
|
||||
'''
|
||||
This test creates a cluster specified by the topology parameter above,
|
||||
configurable number of nodes, tacks, datacenters, and replication factor.
|
||||
@@ -707,9 +716,7 @@ async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topo
|
||||
This stage parses the logs and checks that the data was streamed to nodes within the configured scope.
|
||||
'''
|
||||
|
||||
topology, rf_rack_valid_keyspaces = topology_rf_validity
|
||||
|
||||
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, sstables_storage.object_storage)
|
||||
servers, host_ids = await create_cluster(topology, manager, logger, sstables_storage.object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
cql = manager.get_cql()
|
||||
@@ -858,7 +865,8 @@ async def test_backup_broken_streaming(manager: ManagerClient, s3_storage):
|
||||
|
||||
res = cql.execute(f"SELECT COUNT(*) FROM {keyspace}.{table} BYPASS CACHE USING TIMEOUT 600s;")
|
||||
|
||||
assert res[0].count == expected_rows, f"number of rows after restore is incorrect: {res[0].count}"
|
||||
row = res.one()
|
||||
assert row.count == expected_rows, f"number of rows after restore is incorrect: {row.count}"
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
await log.wait_for("fully contained SSTables to local node from object storage", timeout=10)
|
||||
# just make sure we had partially contained sstables as well
|
||||
@@ -878,7 +886,7 @@ async def test_restore_primary_replica_same_domain(manager: ManagerClient, objec
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
|
||||
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
|
||||
servers, host_ids = await create_cluster(topology, manager, logger, object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
cql = manager.get_cql()
|
||||
@@ -940,7 +948,7 @@ async def test_restore_primary_replica_different_domain(manager: ManagerClient,
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
|
||||
servers, host_ids = await create_cluster(topology, True if domain == 'rack' else False, manager, logger, object_storage)
|
||||
servers, host_ids = await create_cluster(topology, manager, logger, object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
cql = manager.get_cql()
|
||||
@@ -971,3 +979,39 @@ async def test_restore_primary_replica_different_domain(manager: ManagerClient,
|
||||
streamed_to = set([ r[1].group(1) for r in res ])
|
||||
logger.info(f'{s.ip_addr} {host_ids[s.server_id]} streamed to {streamed_to}')
|
||||
assert len(streamed_to) == 2
|
||||
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_decommision_waits_for_backup(manager: ManagerClient, object_storage):
|
||||
'''check that backing up a snapshot for a keyspace blocks decommission'''
|
||||
|
||||
async def decommission_and_check(server: ServerInfo, prefix: str, files, tid):
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
async def finish_backup():
|
||||
# wait for snapshot to stop on waiting for backup
|
||||
await log.wait_for("Waiting for snapshot/backup tasks to finish", from_mark=mark)
|
||||
|
||||
mark2 = await log.mark()
|
||||
|
||||
# let the backup run and finish
|
||||
await manager.api.message_injection(server.ip_addr, "backup_task_pre_upload")
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
|
||||
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
|
||||
uploaded_count = 0
|
||||
# all files should be uploaded. note: can be zero due to two nodes
|
||||
for f in files:
|
||||
in_backup = f'{prefix}/{f}' in objects
|
||||
print(f'Check {f} is in backup: {in_backup}')
|
||||
if in_backup:
|
||||
uploaded_count += 1
|
||||
assert uploaded_count == len(files)
|
||||
# Now wait for decommission to finish
|
||||
await log.wait_for("DECOMMISSIONING: disabled backup and snapshots", from_mark=mark2)
|
||||
|
||||
await asyncio.gather(manager.decommission_node(server.server_id), finish_backup())
|
||||
|
||||
await do_test_backup_helper(manager, object_storage, "backup_task_pre_upload", decommission_and_check, 2)
|
||||
|
||||
|
||||
@@ -44,7 +44,6 @@ async def test_gossiper_empty_self_id_on_shadow_round(manager: ManagerClient):
|
||||
|
||||
logging.info("Starting cluster normally")
|
||||
node1 = await manager.server_add(cmdline=cmdline, start=False, config=cfg)
|
||||
manager.server_add(cmdline=cmdline, start=False)
|
||||
node1_log = await manager.server_open_log(node1.server_id)
|
||||
node2 = await manager.server_add(cmdline=cmdline, start=False, seeds=[node1.ip_addr])
|
||||
node2_log = await manager.server_open_log(node2.server_id)
|
||||
|
||||
@@ -7,6 +7,7 @@ import logging
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
from test.pylib.internal_types import ServerNum
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error_one_shot, InjectionHandler, read_barrier
|
||||
from test.cluster.util import create_new_test_keyspace
|
||||
@@ -19,6 +20,20 @@ def fixture_raft_op_timeout(build_mode):
|
||||
return 10000 if build_mode == 'debug' else 1000
|
||||
|
||||
|
||||
async def update_group0_raft_op_timeout(server_id: ServerNum, manager: ManagerClient, timeout: int) -> None:
|
||||
logger.info(f"Updating group0_raft_op_timeout_in_ms on server {server_id} to {timeout}")
|
||||
running_ids = [srv.server_id for srv in await manager.running_servers()]
|
||||
if server_id in running_ids:
|
||||
# If the node is alive, server_update_config only sends the SIGHUP signal to the Scylla process, so awaiting it
|
||||
# doesn't guarantee that the new config file is active. Work around this by looking at the logs.
|
||||
log_file = await manager.server_open_log(server_id)
|
||||
mark = await log_file.mark()
|
||||
await manager.server_update_config(server_id, 'group0_raft_op_timeout_in_ms', timeout)
|
||||
await log_file.wait_for("completed re-reading configuration file", from_mark=mark, timeout=60)
|
||||
else:
|
||||
await manager.server_update_config(server_id, 'group0_raft_op_timeout_in_ms', timeout)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
@pytest.mark.skip_mode(mode='debug', reason='aarch64/debug is unpredictably slow', platform_key='aarch64')
|
||||
@@ -41,7 +56,6 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
|
||||
|
||||
config = {
|
||||
'direct_failure_detector_ping_timeout_in_ms': 300,
|
||||
'group0_raft_op_timeout_in_ms': raft_op_timeout,
|
||||
'error_injections_at_startup': [
|
||||
{
|
||||
'name': 'raft-group-registry-fd-threshold-in-ms',
|
||||
@@ -63,6 +77,10 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
|
||||
manager.server_stop_gracefully(servers[3].server_id),
|
||||
manager.server_stop_gracefully(servers[4].server_id))
|
||||
|
||||
# Do it here to prevent unexpected timeouts before quorum loss.
|
||||
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, raft_op_timeout)
|
||||
for srv in servers[:2]))
|
||||
|
||||
logger.info("starting a sixth node with no quorum")
|
||||
await manager.server_add(expected_error="raft operation \\[read_barrier\\] timed out, there is no raft quorum",
|
||||
timeout=60)
|
||||
@@ -75,7 +93,6 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
|
||||
@pytest.mark.skip_mode(mode='debug', reason='aarch64/debug is unpredictably slow', platform_key='aarch64')
|
||||
async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_timeout: int) -> None:
|
||||
config = {
|
||||
'group0_raft_op_timeout_in_ms': raft_op_timeout,
|
||||
'error_injections_at_startup': [
|
||||
{
|
||||
'name': 'raft-group-registry-fd-threshold-in-ms',
|
||||
@@ -106,6 +123,9 @@ async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_time
|
||||
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
|
||||
manager.server_stop_gracefully(servers[2].server_id))
|
||||
|
||||
# Do it here to prevent unexpected timeouts before quorum loss.
|
||||
await update_group0_raft_op_timeout(servers[0].server_id, manager, raft_op_timeout)
|
||||
|
||||
logger.info("release join-node-before-add-entry injection")
|
||||
await injection_handler.message()
|
||||
|
||||
@@ -125,7 +145,6 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
|
||||
|
||||
logger.info("adding a fourth node")
|
||||
servers += [await manager.server_add(config={
|
||||
'group0_raft_op_timeout_in_ms': raft_op_timeout,
|
||||
'error_injections_at_startup': [
|
||||
{
|
||||
'name': 'raft-group-registry-fd-threshold-in-ms',
|
||||
@@ -152,6 +171,9 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
|
||||
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
|
||||
manager.server_stop_gracefully(servers[2].server_id))
|
||||
|
||||
# Do it here to prevent unexpected timeouts before quorum loss.
|
||||
await update_group0_raft_op_timeout(servers[3].server_id, manager, raft_op_timeout)
|
||||
|
||||
logger.info("release join-node-response_handler-before-read-barrier injection")
|
||||
injection_handler = InjectionHandler(manager.api,
|
||||
'join-node-response_handler-before-read-barrier',
|
||||
@@ -168,7 +190,6 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
|
||||
async def test_cannot_run_operations(manager: ManagerClient, raft_op_timeout: int) -> None:
|
||||
logger.info("starting a first node (the leader)")
|
||||
servers = [await manager.server_add(config={
|
||||
'group0_raft_op_timeout_in_ms': raft_op_timeout,
|
||||
'error_injections_at_startup': [
|
||||
{
|
||||
'name': 'raft-group-registry-fd-threshold-in-ms',
|
||||
@@ -188,6 +209,9 @@ async def test_cannot_run_operations(manager: ManagerClient, raft_op_timeout: in
|
||||
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
|
||||
manager.server_stop_gracefully(servers[2].server_id))
|
||||
|
||||
# Do it here to prevent unexpected timeouts before quorum loss.
|
||||
await update_group0_raft_op_timeout(servers[0].server_id, manager, raft_op_timeout)
|
||||
|
||||
logger.info("attempting removenode for the second node")
|
||||
await manager.remove_node(servers[0].server_id, servers[1].server_id,
|
||||
expected_error="raft operation [read_barrier] timed out, there is no raft quorum",
|
||||
@@ -231,9 +255,7 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
|
||||
await asyncio.gather(*(manager.server_stop(srv.server_id) for srv in servers))
|
||||
|
||||
# This ensures the read barriers below fail quickly without group 0 quorum.
|
||||
logger.info(f"Decreasing group0_raft_op_timeout_in_ms on {servers}")
|
||||
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', raft_op_timeout)
|
||||
for srv in servers))
|
||||
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, raft_op_timeout) for srv in servers))
|
||||
|
||||
logger.info(f"Restarting {servers[:2]} with no group 0 quorum")
|
||||
for idx, srv in enumerate(servers[:2]):
|
||||
@@ -245,8 +267,7 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
|
||||
|
||||
# Increase the timeout back to 300s to ensure the new group 0 leader is elected before the first read barrier below
|
||||
# times out.
|
||||
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', 300000)
|
||||
for srv in servers))
|
||||
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, 300000) for srv in servers))
|
||||
|
||||
logger.info(f"Restarting {servers[2:]} with group 0 quorum")
|
||||
for srv in servers[2:]:
|
||||
|
||||
@@ -75,16 +75,16 @@ class SSTablesOnLocalStorage:
|
||||
await asyncio.gather(*(self.refresh_one(manager, s, ks, cf, sstables, scope, primary_replica_only) for s, sstables in sstables_per_server.items()))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("topology_rf_validity", [
|
||||
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
|
||||
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
|
||||
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
|
||||
@pytest.mark.parametrize("topology", [
|
||||
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
|
||||
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
|
||||
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
|
||||
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
|
||||
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
|
||||
])
|
||||
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity):
|
||||
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology):
|
||||
'''Check that refreshing of a cluster with stream scopes works'''
|
||||
await do_test_streaming_scopes(build_mode, manager, topology_rf_validity, SSTablesOnLocalStorage())
|
||||
await do_test_streaming_scopes(build_mode, manager, topology, SSTablesOnLocalStorage())
|
||||
|
||||
|
||||
async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
||||
@@ -94,7 +94,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
||||
|
||||
topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1)
|
||||
|
||||
servers, host_ids = await create_cluster(topology, True, manager, logger)
|
||||
servers, host_ids = await create_cluster(topology, manager, logger)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ async def test_snapshot_on_all_nodes(manager: ManagerClient):
|
||||
"""
|
||||
topology = topo(rf = 3, nodes = 3, racks = 3, dcs = 1)
|
||||
|
||||
servers, _ = await create_cluster(topology, True, manager, logger)
|
||||
servers, _ = await create_cluster(topology, manager, logger)
|
||||
|
||||
snapshot_name = unique_name('snap_')
|
||||
|
||||
|
||||
@@ -182,7 +182,7 @@ async def test_tombstone_gc_insert_flush(manager: ManagerClient):
|
||||
async def test_tablet_manual_repair_all_tokens(manager: ManagerClient):
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)
|
||||
token = "all"
|
||||
now = datetime.datetime.utcnow()
|
||||
now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
|
||||
map1 = await load_tablet_repair_time(cql, hosts[0:1], table_id)
|
||||
|
||||
await guarantee_repair_time_next_second()
|
||||
|
||||
@@ -1324,7 +1324,7 @@ async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, n
|
||||
return servers
|
||||
|
||||
|
||||
class TestContext:
|
||||
class Context:
|
||||
def __init__(self, ks: str, table: str, rf: int, initial_tablets: int, num_keys: int):
|
||||
self.ks = ks
|
||||
self.table = table
|
||||
@@ -1347,7 +1347,7 @@ async def create_and_populate_table(manager: ManagerClient, rf: int = 3, initial
|
||||
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': {initial_tablets}}}")
|
||||
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int PRIMARY KEY, c int)")
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.{table} (pk, c) VALUES ({k}, 1);") for k in range(num_keys)])
|
||||
yield TestContext(ks, table, rf, initial_tablets, num_keys)
|
||||
yield Context(ks, table, rf, initial_tablets, num_keys)
|
||||
finally:
|
||||
await cql.run_async(f"DROP KEYSPACE {ks}")
|
||||
|
||||
|
||||
@@ -53,6 +53,11 @@ filterwarnings =
|
||||
ignore::DeprecationWarning:importlib._bootstrap
|
||||
ignore::DeprecationWarning:botocore
|
||||
ignore::DeprecationWarning:pytest_elk_reporter
|
||||
ignore::sqlalchemy.exc.MovedIn20Warning:kmip.pie.sqltypes
|
||||
ignore::cryptography.utils.CryptographyDeprecationWarning:kmip
|
||||
ignore:TypeDecorator .* will not produce a cache key:sqlalchemy.exc.SAWarning:kmip
|
||||
ignore:Exception in thread[\s\S]*kmip[\s\S]*shutdown:pytest.PytestUnhandledThreadExceptionWarning
|
||||
|
||||
|
||||
tmp_path_retention_count = 1
|
||||
tmp_path_retention_policy = failed
|
||||
|
||||
@@ -1,108 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstddef>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
|
||||
namespace utils {
|
||||
|
||||
/// A Count-Min Sketch with 4-bit counters for frequency estimation.
|
||||
///
|
||||
/// Used by the W-TinyLFU cache admission policy to estimate access frequency.
|
||||
/// Each counter is 4 bits (max value 15), and counters are packed 16 per
|
||||
/// uint64_t word. The sketch uses 4 independent hash functions (rows) and
|
||||
/// returns the minimum count across all rows for frequency estimation.
|
||||
class count_min_sketch {
|
||||
static constexpr size_t depth = 4;
|
||||
static constexpr uint64_t reset_mask = 0x7777777777777777ULL;
|
||||
|
||||
// Hash seeds from splitmix64 sequence, chosen for low correlation between rows.
|
||||
static constexpr uint64_t seeds[depth] = {
|
||||
0x9e3779b97f4a7c15ULL,
|
||||
0xbf58476d1ce4e5b9ULL,
|
||||
0x94d049bb133111ebULL,
|
||||
0xd6e8feb86659fd93ULL,
|
||||
};
|
||||
|
||||
std::vector<uint64_t> _table;
|
||||
size_t _width;
|
||||
size_t _width_mask;
|
||||
size_t _words_per_row;
|
||||
|
||||
static uint64_t mix(uint64_t key, uint64_t seed) noexcept {
|
||||
uint64_t h = key * seed;
|
||||
h ^= h >> 32;
|
||||
h *= 0xd6e8feb86659fd93ULL;
|
||||
h ^= h >> 32;
|
||||
return h;
|
||||
}
|
||||
|
||||
size_t counter_index(size_t row, uint64_t key) const noexcept {
|
||||
return mix(key, seeds[row]) & _width_mask;
|
||||
}
|
||||
|
||||
static uint8_t get_counter(uint64_t word, size_t pos) noexcept {
|
||||
return (word >> (pos * 4)) & 0x0FULL;
|
||||
}
|
||||
|
||||
size_t word_index(size_t row, size_t col) const noexcept {
|
||||
return row * _words_per_row + col / 16;
|
||||
}
|
||||
|
||||
public:
|
||||
/// Construct a sketch with the given number of counters per row.
|
||||
/// \param width_log2 Log base 2 of the number of counters per row.
|
||||
/// Total memory is approximately depth * 2^width_log2 / 2 bytes.
|
||||
explicit count_min_sketch(size_t width_log2 = 16)
|
||||
: _width(size_t(1) << width_log2)
|
||||
, _width_mask(_width - 1)
|
||||
, _words_per_row(_width / 16)
|
||||
{
|
||||
_table.resize(depth * _words_per_row, 0);
|
||||
}
|
||||
|
||||
void increment(uint64_t key) noexcept {
|
||||
for (size_t row = 0; row < depth; ++row) {
|
||||
size_t col = counter_index(row, key);
|
||||
size_t wi = word_index(row, col);
|
||||
size_t pos = col & 15;
|
||||
uint8_t val = get_counter(_table[wi], pos);
|
||||
if (val < 15) {
|
||||
_table[wi] += (1ULL << (pos * 4));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t estimate(uint64_t key) const noexcept {
|
||||
uint8_t min_val = 15;
|
||||
for (size_t row = 0; row < depth; ++row) {
|
||||
size_t col = counter_index(row, key);
|
||||
size_t wi = word_index(row, col);
|
||||
size_t pos = col & 15;
|
||||
min_val = std::min(min_val, get_counter(_table[wi], pos));
|
||||
}
|
||||
return min_val;
|
||||
}
|
||||
|
||||
/// Halve all counters (frequency decay / aging).
|
||||
/// This is NOT a full clear — it preserves relative frequency ordering
|
||||
/// while allowing the sketch to adapt to changing access patterns.
|
||||
void decay() noexcept {
|
||||
for (auto& word : _table) {
|
||||
word = (word >> 1) & reset_mask;
|
||||
}
|
||||
}
|
||||
|
||||
size_t width() const noexcept { return _width; }
|
||||
};
|
||||
|
||||
} // namespace utils
|
||||
308
utils/lru.hh
308
utils/lru.hh
@@ -9,18 +9,8 @@
|
||||
#pragma once
|
||||
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/count_min_sketch.hh"
|
||||
#include <boost/intrusive/list.hpp>
|
||||
#include <seastar/core/memory.hh>
|
||||
#include <algorithm>
|
||||
|
||||
// Identifies which W-TinyLFU segment an evictable belongs to.
|
||||
enum class lru_segment : uint8_t {
|
||||
none = 0,
|
||||
window = 1,
|
||||
probation = 2,
|
||||
protected_ = 3,
|
||||
};
|
||||
|
||||
class evictable {
|
||||
friend class lru;
|
||||
@@ -42,11 +32,6 @@ protected:
|
||||
static_assert(std::is_nothrow_constructible_v<lru_link_type, lru_link_type&&>);
|
||||
private:
|
||||
lru_link_type _lru_link;
|
||||
// Stable key for frequency estimation in the Count-Min Sketch.
|
||||
// Assigned when the entry is first added to the LRU, preserved across LSA
|
||||
// compaction moves (which change the object address but preserve members).
|
||||
uint32_t _frequency_hash = 0;
|
||||
lru_segment _segment = lru_segment::none;
|
||||
protected:
|
||||
// Prevent destruction via evictable pointer. LRU is not aware of allocation strategy.
|
||||
// Prevent destruction of a linked evictable. While we could unlink the evictable here
|
||||
@@ -69,8 +54,6 @@ public:
|
||||
|
||||
void swap(evictable& o) noexcept {
|
||||
_lru_link.swap_nodes(o._lru_link);
|
||||
std::swap(_frequency_hash, o._frequency_hash);
|
||||
std::swap(_segment, o._segment);
|
||||
}
|
||||
|
||||
virtual bool is_index() const noexcept {
|
||||
@@ -93,27 +76,13 @@ class index_evictable : public evictable {
|
||||
}
|
||||
};
|
||||
|
||||
// Implements W-TinyLFU cache replacement for row cache and sstable index cache.
|
||||
//
|
||||
// W-TinyLFU uses a small admission window backed by an LRU and a main cache
|
||||
// organized as a Segmented LRU (SLRU) with probation and protected segments.
|
||||
// Admission to the main cache is controlled by a TinyLFU frequency filter
|
||||
// implemented via a Count-Min Sketch.
|
||||
//
|
||||
// New entries enter the window. When eviction is needed, the window victim
|
||||
// competes with the probation victim: the entry with higher estimated
|
||||
// frequency survives in probation while the other is evicted.
|
||||
// Touching an entry in probation promotes it to the protected segment.
|
||||
// When the protected segment exceeds its target size, the least-recently-used
|
||||
// protected entry is demoted back to probation.
|
||||
// Implements LRU cache replacement for row cache and sstable index cache.
|
||||
class lru {
|
||||
private:
|
||||
using lru_type = boost::intrusive::list<evictable,
|
||||
boost::intrusive::member_hook<evictable, evictable::lru_link_type, &evictable::_lru_link>,
|
||||
boost::intrusive::constant_time_size<false>>; // we need this to have bi::auto_unlink on hooks.
|
||||
lru_type _window;
|
||||
lru_type _probation;
|
||||
lru_type _protected;
|
||||
lru_type _list;
|
||||
|
||||
// See the comment to index_evictable.
|
||||
using index_lru_type = boost::intrusive::list<index_evictable,
|
||||
@@ -123,225 +92,24 @@ private:
|
||||
|
||||
using reclaiming_result = seastar::memory::reclaiming_result;
|
||||
|
||||
static constexpr size_t sketch_width_log2 = 16;
|
||||
static constexpr size_t sketch_width = size_t(1) << sketch_width_log2;
|
||||
static constexpr size_t sample_threshold = sketch_width * 10;
|
||||
// Window segment target: ~1% of total cache entries.
|
||||
static constexpr size_t window_percent = 1;
|
||||
// Protected segment target: ~80% of total cache entries.
|
||||
static constexpr size_t protected_percent = 80;
|
||||
|
||||
utils::count_min_sketch _sketch{sketch_width_log2};
|
||||
size_t _window_size = 0;
|
||||
size_t _probation_size = 0;
|
||||
size_t _protected_size = 0;
|
||||
size_t _sample_count = 0;
|
||||
// Monotonic counter for assigning stable frequency hash keys.
|
||||
// Using object addresses as sketch keys is incorrect because LSA
|
||||
// relocates objects during compaction, changing their address.
|
||||
uint32_t _next_hash = 0;
|
||||
|
||||
size_t total_size() const noexcept {
|
||||
return _window_size + _probation_size + _protected_size;
|
||||
}
|
||||
|
||||
size_t max_window_size() const noexcept {
|
||||
return std::max(size_t(1), total_size() * window_percent / 100);
|
||||
}
|
||||
|
||||
size_t max_protected_size() const noexcept {
|
||||
return total_size() * protected_percent / 100;
|
||||
}
|
||||
|
||||
static uint64_t entry_key(const evictable& e) noexcept {
|
||||
return e._frequency_hash;
|
||||
}
|
||||
|
||||
void assign_frequency_hash(evictable& e) noexcept {
|
||||
// Only assign a new hash if the entry doesn't have one yet.
|
||||
// Re-added entries (after remove()) keep their existing hash
|
||||
// to preserve frequency tracking across remove/add cycles.
|
||||
if (e._frequency_hash == 0) {
|
||||
// Skip 0 on wrap-around to keep it as the "unassigned" sentinel.
|
||||
if (++_next_hash == 0) {
|
||||
++_next_hash;
|
||||
}
|
||||
e._frequency_hash = _next_hash;
|
||||
}
|
||||
}
|
||||
|
||||
void record_access(const evictable& e) noexcept {
|
||||
_sketch.increment(entry_key(e));
|
||||
if (++_sample_count >= sample_threshold) {
|
||||
_sketch.decay();
|
||||
_sample_count /= 2;
|
||||
}
|
||||
}
|
||||
|
||||
lru_type& segment_list(lru_segment seg) noexcept {
|
||||
switch (seg) {
|
||||
case lru_segment::none:
|
||||
SCYLLA_ASSERT(false && "segment_list called with none");
|
||||
__builtin_unreachable();
|
||||
case lru_segment::window: return _window;
|
||||
case lru_segment::probation: return _probation;
|
||||
case lru_segment::protected_: return _protected;
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
void increment_size(lru_segment seg) noexcept {
|
||||
switch (seg) {
|
||||
case lru_segment::none: break;
|
||||
case lru_segment::window: ++_window_size; break;
|
||||
case lru_segment::probation: ++_probation_size; break;
|
||||
case lru_segment::protected_: ++_protected_size; break;
|
||||
}
|
||||
}
|
||||
|
||||
void decrement_size(lru_segment seg) noexcept {
|
||||
switch (seg) {
|
||||
case lru_segment::none: break;
|
||||
case lru_segment::window: --_window_size; break;
|
||||
case lru_segment::probation: --_probation_size; break;
|
||||
case lru_segment::protected_: --_protected_size; break;
|
||||
}
|
||||
}
|
||||
|
||||
void remove_from_segment(evictable& e) noexcept {
|
||||
auto& list = segment_list(e._segment);
|
||||
list.erase(list.iterator_to(e));
|
||||
decrement_size(e._segment);
|
||||
e._segment = lru_segment::none;
|
||||
}
|
||||
|
||||
void add_to_segment(evictable& e, lru_segment seg) noexcept {
|
||||
e._segment = seg;
|
||||
segment_list(seg).push_back(e);
|
||||
increment_size(seg);
|
||||
}
|
||||
|
||||
// Move excess protected entries to probation.
|
||||
// Bounded to avoid reactor stalls when the protected segment is
|
||||
// significantly oversized (e.g. after many promotions without eviction).
|
||||
static constexpr size_t max_rebalance_per_call = 128;
|
||||
void rebalance_protected() noexcept {
|
||||
size_t max_prot = max_protected_size();
|
||||
size_t moved = 0;
|
||||
while (_protected_size > max_prot && !_protected.empty() && moved < max_rebalance_per_call) {
|
||||
evictable& victim = _protected.front();
|
||||
remove_from_segment(victim);
|
||||
add_to_segment(victim, lru_segment::probation);
|
||||
++moved;
|
||||
}
|
||||
}
|
||||
|
||||
// Evicts a single element using W-TinyLFU policy.
|
||||
template <bool Shallow = false>
|
||||
reclaiming_result do_evict(bool should_evict_index) noexcept {
|
||||
// Index eviction path: evict the least recently used index entry.
|
||||
if (should_evict_index && !_index_list.empty()) {
|
||||
evictable& e = _index_list.front();
|
||||
remove(e);
|
||||
if constexpr (!Shallow) {
|
||||
e.on_evicted();
|
||||
} else {
|
||||
e.on_evicted_shallow();
|
||||
}
|
||||
return reclaiming_result::reclaimed_something;
|
||||
}
|
||||
|
||||
if (_window.empty() && _probation.empty() && _protected.empty()) {
|
||||
return reclaiming_result::reclaimed_nothing;
|
||||
}
|
||||
|
||||
rebalance_protected();
|
||||
|
||||
// Drain excess from window using TinyLFU admission.
|
||||
while (_window_size > max_window_size() && !_window.empty()) {
|
||||
evictable& w_victim = _window.front();
|
||||
|
||||
if (!_probation.empty()) {
|
||||
// Competition: window victim vs. probation victim.
|
||||
evictable& p_victim = _probation.front();
|
||||
uint8_t w_freq = _sketch.estimate(entry_key(w_victim));
|
||||
uint8_t p_freq = _sketch.estimate(entry_key(p_victim));
|
||||
|
||||
if (w_freq >= p_freq) {
|
||||
// Admit window victim to probation; evict probation victim.
|
||||
remove_from_segment(w_victim);
|
||||
add_to_segment(w_victim, lru_segment::probation);
|
||||
remove(p_victim);
|
||||
if constexpr (!Shallow) {
|
||||
p_victim.on_evicted();
|
||||
} else {
|
||||
p_victim.on_evicted_shallow();
|
||||
}
|
||||
} else {
|
||||
// Reject window victim.
|
||||
remove(w_victim);
|
||||
if constexpr (!Shallow) {
|
||||
w_victim.on_evicted();
|
||||
} else {
|
||||
w_victim.on_evicted_shallow();
|
||||
}
|
||||
}
|
||||
return reclaiming_result::reclaimed_something;
|
||||
}
|
||||
|
||||
// Probation is empty: move window victim to probation and retry.
|
||||
remove_from_segment(w_victim);
|
||||
add_to_segment(w_victim, lru_segment::probation);
|
||||
}
|
||||
|
||||
// Window is within target. Evict from probation, then window, then protected.
|
||||
evictable* victim = nullptr;
|
||||
if (!_probation.empty()) {
|
||||
victim = &_probation.front();
|
||||
} else if (!_window.empty()) {
|
||||
victim = &_window.front();
|
||||
} else if (!_protected.empty()) {
|
||||
victim = &_protected.front();
|
||||
} else {
|
||||
return reclaiming_result::reclaimed_nothing;
|
||||
}
|
||||
remove(*victim);
|
||||
if constexpr (!Shallow) {
|
||||
victim->on_evicted();
|
||||
} else {
|
||||
victim->on_evicted_shallow();
|
||||
}
|
||||
return reclaiming_result::reclaimed_something;
|
||||
}
|
||||
|
||||
public:
|
||||
~lru() {
|
||||
auto drain = [this](lru_type& list) {
|
||||
while (!list.empty()) {
|
||||
evictable& e = list.front();
|
||||
remove(e);
|
||||
e.on_evicted();
|
||||
}
|
||||
};
|
||||
drain(_window);
|
||||
drain(_probation);
|
||||
drain(_protected);
|
||||
while (!_list.empty()) {
|
||||
evictable& e = _list.front();
|
||||
remove(e);
|
||||
e.on_evicted();
|
||||
}
|
||||
}
|
||||
|
||||
void remove(evictable& e) noexcept {
|
||||
auto& list = segment_list(e._segment);
|
||||
list.erase(list.iterator_to(e));
|
||||
decrement_size(e._segment);
|
||||
e._segment = lru_segment::none;
|
||||
_list.erase(_list.iterator_to(e));
|
||||
if (e.is_index()) {
|
||||
_index_list.erase(_index_list.iterator_to(static_cast<index_evictable&>(e)));
|
||||
}
|
||||
}
|
||||
|
||||
void add(evictable& e) noexcept {
|
||||
assign_frequency_hash(e);
|
||||
record_access(e);
|
||||
add_to_segment(e, lru_segment::window);
|
||||
_list.push_back(e);
|
||||
if (e.is_index()) {
|
||||
_index_list.push_back(static_cast<index_evictable&>(e));
|
||||
}
|
||||
@@ -349,52 +117,36 @@ public:
|
||||
|
||||
// Like add(e) but makes sure that e is evicted right before "more_recent" in the absence of later touches.
|
||||
void add_before(evictable& more_recent, evictable& e) noexcept {
|
||||
assign_frequency_hash(e);
|
||||
record_access(e);
|
||||
lru_segment seg = more_recent._segment;
|
||||
auto& list = segment_list(seg);
|
||||
list.insert(list.iterator_to(more_recent), e);
|
||||
e._segment = seg;
|
||||
increment_size(seg);
|
||||
_list.insert(_list.iterator_to(more_recent), e);
|
||||
}
|
||||
|
||||
// Handles access to an entry:
|
||||
// - In window: moves to back of window.
|
||||
// - In probation: promotes to protected.
|
||||
// - In protected: moves to back of protected.
|
||||
// - Not linked: adds to window.
|
||||
void touch(evictable& e) noexcept {
|
||||
record_access(e);
|
||||
|
||||
switch (e._segment) {
|
||||
case lru_segment::none:
|
||||
assign_frequency_hash(e);
|
||||
add_to_segment(e, lru_segment::window);
|
||||
break;
|
||||
case lru_segment::window:
|
||||
_window.erase(_window.iterator_to(e));
|
||||
_window.push_back(e);
|
||||
break;
|
||||
case lru_segment::probation:
|
||||
_probation.erase(_probation.iterator_to(e));
|
||||
--_probation_size;
|
||||
e._segment = lru_segment::protected_;
|
||||
_protected.push_back(e);
|
||||
++_protected_size;
|
||||
break;
|
||||
case lru_segment::protected_:
|
||||
_protected.erase(_protected.iterator_to(e));
|
||||
_protected.push_back(e);
|
||||
break;
|
||||
}
|
||||
remove(e);
|
||||
add(e);
|
||||
}
|
||||
|
||||
// Evicts a single element using the W-TinyLFU policy.
|
||||
// Evicts a single element from the LRU
|
||||
template <bool Shallow = false>
|
||||
reclaiming_result do_evict(bool should_evict_index) noexcept {
|
||||
if (_list.empty()) {
|
||||
return reclaiming_result::reclaimed_nothing;
|
||||
}
|
||||
evictable& e = (should_evict_index && !_index_list.empty()) ? _index_list.front() : _list.front();
|
||||
remove(e);
|
||||
if constexpr (!Shallow) {
|
||||
e.on_evicted();
|
||||
} else {
|
||||
e.on_evicted_shallow();
|
||||
}
|
||||
return reclaiming_result::reclaimed_something;
|
||||
}
|
||||
|
||||
// Evicts a single element from the LRU.
|
||||
reclaiming_result evict(bool should_evict_index = false) noexcept {
|
||||
return do_evict<false>(should_evict_index);
|
||||
}
|
||||
|
||||
// Evicts a single element using the W-TinyLFU policy.
|
||||
// Evicts a single element from the LRU.
|
||||
// Will call on_evicted_shallow() instead of on_evicted().
|
||||
reclaiming_result evict_shallow() noexcept {
|
||||
return do_evict<true>(false);
|
||||
|
||||
Reference in New Issue
Block a user