Compare commits

..

24 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
173fb1e6d3 Clarify audit all-keyspaces exclusivity
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 15:32:11 +00:00
copilot-swe-agent[bot]
e252bb1550 Revise audit all-keyspaces design
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 15:04:40 +00:00
copilot-swe-agent[bot]
5713b5efd1 Finalize audit design doc clarifications
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:49:48 +00:00
copilot-swe-agent[bot]
979ec5ada8 Polish audit design doc review feedback
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:47:26 +00:00
copilot-swe-agent[bot]
67503a350b Refine audit prototype design details
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:45:38 +00:00
copilot-swe-agent[bot]
a90490c3cf Clarify audit design doc semantics
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:44:28 +00:00
copilot-swe-agent[bot]
6f957ea4e0 Add audit prototype design doc
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:43:44 +00:00
copilot-swe-agent[bot]
f6605f7b66 Initial plan 2026-03-17 14:37:56 +00:00
Asias He
6cb263bab0 repair: Prevent CPU stall during cross-shard row copy and destruction
When handling `repair_stream_cmd::end_of_current_rows`, passing the
foreign list directly to `put_row_diff_handler` triggered a massive
synchronous deep copy on the destination shard. Additionally, destroying
the list triggered a synchronous deallocation on the source shard. This
blocked the reactor and triggered the CPU stall detector.

This commit fixes the issue by introducing `clone_gently()` to copy the
list elements one by one, and leveraging the existing
`utils::clear_gently()` to destroy them. Both utilize
`seastar::coroutine::maybe_yield()` to allow the reactor to breathe
during large cross-shard transfers and cleanups.

Fixes SCYLLADB-403

Closes scylladb/scylladb#28979
2026-03-17 11:05:15 +02:00
Botond Dénes
035aa90d4b Merge 'Alternator: add per-table batch latency metrics and test coverage' from Amnon Heiman
This series fixes a metrics visibility gap in Alternator and adds regression coverage.

Until now, BatchGetItem and BatchWriteItem updated global latency histograms but did not consistently update per-table latency histograms. As a result, table-level latency dashboards could miss batch traffic.

It updates the batch read/write paths to compute request duration once and record it in both global and per-table latency metrics.

Add the missing tests, including a metric-agnostic helper and a dedicated per-table latency test that verifies latency counters increase for item and batch operations.

This change is metrics-only (no API/behavior change for requests) and improves observability consistency between global and per-table views.

Fixes #28721

**We assume the alternator per-table metrics exist, but the batch ones are not updated**

Closes scylladb/scylladb#28732

* github.com:scylladb/scylladb:
  test(alternator): add per-table latency coverage for item and batch ops
  alternator: track per-table latency for batch get/write operations
2026-03-16 17:18:00 +02:00
Michał Hudobski
40d180a7ef docs: update vector search filtering to reflect primary key support only
Remove outdated references to filtering on columns provided in the
index definition, and remove the note about equal relations (= and IN)
being the only supported operations. Vector search filtering currently
supports WHERE clauses on primary key columns only.

Closes scylladb/scylladb#28949
2026-03-16 17:16:16 +02:00
Botond Dénes
9de8d6798e Merge 'reader_concurrency_semaphore: skip preemptive abort for permits waiting for memory' from Łukasz Paszkowski
Permits in the `waiting_for_memory` state represent already-executing reads that are blocked on memory allocation. Preemptively aborting them is wasteful -- these reads have already consumed resources and made progress, so they should be allowed to complete.

Restrict the preemptive abort check in maybe_admit_waiters() to only apply to permits in the `waiting_for_admission` state, and tighten the state validation in `on_preemptive_aborted()` accordingly.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1016

Backport not needed. The commit introducing replica load shedding is not part of 2026.1

Closes scylladb/scylladb#29025

* github.com:scylladb/scylladb:
  reader_concurrency_semaphore: skip preemptive abort for permits waiting for memory
  reader_concurrency_semaphore_test: detect memory leak on preemptive abort of waiting_for_memory permit
2026-03-16 17:14:25 +02:00
Calle Wilund
a5df2e79a7 storage_service: Wait for snapshot/backup before decommission
Fixes: SCYLLADB-244

Disables snapshot control such that any active ops finish/fail
before proceeding with decommission.
Note: snapshot control provided as argument, not member ref
due to storage_service being used from both main and cql_test_env.
(The latter has no snapshot_ctl to provide).

Could do the snapshot lockout on API level, but want to do
pre-checks before this.

Note: this just disables backup/snapshot fully. Could re-enable
after decommission, but this seems somewhat pointless.

v2:
* Add log message to snapshot shutdown
* Make test use log waiting instead of timeouts

Closes scylladb/scylladb#28980
2026-03-16 17:12:57 +02:00
bitpathfinder
85d5073234 test: Fix non-awaited coroutine in test_gossiper_empty_self_id_on_shadow_round
The line with the error was not actually needed and has therefore been removed.

Fixes: SCYLLADB-906

Closes scylladb/scylladb#28884
2026-03-16 17:07:36 +02:00
Botond Dénes
3e4e0c57b8 Merge 'Relax rf-rack-valid-keyspace option in backup/restore tests' from Pavel Emelyanov
Some tests, when create a cluster, configure nodes with the rf-rack-valid option, because sometimes they want to have it OFF. For that the option is explicitly carried around, but the cluster creating helper can guess this option itself -- out of the provided topology and replication factor.

Removing this option simplifies the code and (which a nicer outcome) the test "signature" that's used e.g. in command-line to run a specific test.

Improving tests, not backporting

Closes scylladb/scylladb#28860

* github.com:scylladb/scylladb:
  test: Relax topology_rf_validity parameter for some tests
  test: Auto detect rf-rack-valid option in create_cluster()
2026-03-16 17:06:46 +02:00
Patryk Jędrzejczak
526e5986fe test: test_raft_no_quorum: decrease group0_raft_op_timeout_in_ms after quorum loss
`test_raft_no_quorum.py::test_cannot_add_new_node` is currently flaky in dev
mode. The bootstrap of the first node can fail due to `add_entry()` timing
out (with the 1s timeout set by the test case).

Other test cases in this test file could fail in the same way as well, so we
need a general fix. We don't want to increase the timeout in dev mode, as it
would slow down the test. The solution is to keep the timeout unchanged, but
set it only after quorum is lost. This prevents unexpected timeouts of group0
operations with almost no impact on the test running time.

A note about the new `update_group0_raft_op_timeout` function: waiting for
the log seems to be necessary only for
`test_quorum_lost_during_node_join_response_handler`, but let's do it
for all test cases just in case (including `test_can_restart` that shouldn't
be flaky currently).

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-913

Closes scylladb/scylladb#28998
2026-03-16 16:58:15 +02:00
Dani Tweig
bc0952781a Update Jira sync calling workflow to consolidated view
Replaced multiple per-action workflow jobs with a single consolidated
call to main_pr_events_jira_sync.yml. Added 'edited' event trigger.
This makes CI actions in PRs more readable and workflow execution faster.

Fixes:PM-253

Closes scylladb/scylladb#29042
2026-03-16 08:25:32 +02:00
Artsiom Mishuta
755d528135 test.py: fix warnings
changes in this commit:
1)rename class from 'TestContext' to  'Context' so pytest will not consider this class as a test

2)extend pytest filterwarnings list to ignore warnings from external libs

3) use datetime.datetime.now(datetime.UTC) unstead  datetime.datetime.utcnow()

4) use  ResultSet.one() instead  ResultSet[0]

Fixes SCYLLADB-904
Fixes SCYLLADB-908
Related SCYLLADB-902

Closes scylladb/scylladb#28956
2026-03-15 12:00:10 +02:00
Pavel Emelyanov
d544d8602d test: Relax topology_rf_validity parameter for some tests
Tests that call create_cluster() helper no longer need to carry the
rf-validity parameter. This simplifies the code and test signature.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2026-03-13 14:30:32 +03:00
Pavel Emelyanov
313985fed7 test: Auto detect rf-rack-valid option in create_cluster()
The helper accepts its as boolean argument, but it can easily estimate
one from the provided topology.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2026-03-13 14:30:32 +03:00
Łukasz Paszkowski
4c4d043a3b reader_concurrency_semaphore: skip preemptive abort for permits waiting for memory
Permits in the `waiting_for_memory` state represent already-executing
reads that are blocked on memory allocation. Preemptively aborting
them is wasteful -- these reads have already consumed resources and
made progress, so they should be allowed to complete.

Restrict the preemptive abort check in maybe_admit_waiters() to only
apply to permits in the `waiting_for_admission` state, and tighten
the state validation in `on_preemptive_aborted()` accordingly.

Adjust the following tests:
+ test_reader_concurrency_semaphore_abort_preemptively_aborted_permit
  no longer relies on requesting memory
+ test_reader_concurrency_semaphore_preemptive_abort_requested_memory_leak
  adjusted to the fix

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1016
2026-03-13 09:50:05 +01:00
Andrzej Jackowski
3b9cd52a95 reader_concurrency_semaphore_test: detect memory leak on preemptive abort of waiting_for_memory permit
A permit in `waiting_for_memory` state can be preemptively aborted by
maybe_admit_waiters(). This is wrong: such permits have already been
admitted and are actively processing a read — they are merely blocked
waiting for memory under serialize-limit pressure.

When `on_preemptive_aborted()` fires on a `waiting_for_memory` permit,
it does not clear `_requested_memory`. A subsequent `request_memory()`
call accumulatesa on top of the stale value, causing `on_granted_memory()`
to consume more than resource_units tracks.

This commit adds a test that confirms that scenario by counting
internal_errors.
2026-03-12 17:09:34 +01:00
Amnon Heiman
aca5284b13 test(alternator): add per-table latency coverage for item and batch ops
Add missing tests for per-table Alternator latency metrics to ensure recent
per-table latency accounting is actually validated.

Changes in this patch:

Refactor latency assertion helper into check_sets_latency_by_metric(),
parameterized by metric name.
Keep existing behavior by implementing check_sets_latency() as a wrapper
over scylla_alternator_op_latency.
Add test_item_latency_per_table() to verify
scylla_alternator_table_op_latency_count increases for:
PutItem, GetItem, DeleteItem, UpdateItem, BatchWriteItem,
and BatchGetItem.
This closes a test gap where only global latency metrics were checked, while
per-table latency metrics were not covered.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
2026-02-25 20:51:18 +02:00
Amnon Heiman
29e0b4e08c alternator: track per-table latency for batch get/write operations
Batch operations were updating only global latency histograms, which left
table-level latency metrics incomplete.

This change computes request duration once at the end of each operation and
reuses it to update both global and per-table latency stats:

Latencies are stored per table used,

This aligns batch read/write metric behavior with other operations and improves
per-table observability.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
2026-02-25 20:51:18 +02:00
32 changed files with 643 additions and 927 deletions

View File

@@ -1,8 +1,8 @@
name: Sync Jira Based on PR Events
name: Sync Jira Based on PR Events
on:
pull_request_target:
types: [opened, ready_for_review, review_requested, labeled, unlabeled, closed]
types: [opened, edited, ready_for_review, review_requested, labeled, unlabeled, closed]
permissions:
contents: read
@@ -10,32 +10,9 @@ permissions:
issues: write
jobs:
jira-sync-pr-opened:
if: github.event.action == 'opened'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_opened.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-in-review:
if: github.event.action == 'ready_for_review' || github.event.action == 'review_requested'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-add-label:
if: github.event.action == 'labeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_add_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-remove-label:
if: github.event.action == 'unlabeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_remove_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-pr-closed:
if: github.event.action == 'closed'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_closed.yml@main
jira-sync:
uses: scylladb/github-automation/.github/workflows/main_pr_events_jira_sync.yml@main
with:
caller_action: ${{ github.event.action }}
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -3463,7 +3463,11 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
if (should_add_wcu) {
rjson::add(ret, "ConsumedCapacity", std::move(consumed_capacity));
}
_stats.api_operations.batch_write_item_latency.mark(std::chrono::steady_clock::now() - start_time);
auto duration = std::chrono::steady_clock::now() - start_time;
_stats.api_operations.batch_write_item_latency.mark(duration);
for (const auto& w : per_table_wcu) {
w.first->api_operations.batch_write_item_latency.mark(duration);
}
co_return rjson::print(std::move(ret));
}
@@ -4974,7 +4978,12 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
if (!some_succeeded && eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
}
_stats.api_operations.batch_get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
auto duration = std::chrono::steady_clock::now() - start_time;
_stats.api_operations.batch_get_item_latency.mark(duration);
for (const table_requests& rs : requests) {
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->api_operations.batch_get_item_latency.mark(duration);
}
if (is_big(response)) {
co_return make_streamed(std::move(response));
} else {

View File

@@ -122,9 +122,9 @@ future<> unset_thrift_controller(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_thrift_controller(ctx, r); });
}
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
return ctx.http_server.set_routes([&ctx, &ss, &group0_client] (routes& r) {
set_storage_service(ctx, r, ss, group0_client);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
return ctx.http_server.set_routes([&ctx, &ss, &ssc, &group0_client] (routes& r) {
set_storage_service(ctx, r, ss, ssc, group0_client);
});
}

View File

@@ -98,7 +98,7 @@ future<> set_server_config(http_context& ctx, db::config& cfg);
future<> unset_server_config(http_context& ctx);
future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snitch);
future<> unset_server_snitch(http_context& ctx);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
future<> unset_server_storage_service(http_context& ctx);
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
future<> unset_server_client_routes(http_context& ctx);

View File

@@ -835,9 +835,9 @@ rest_force_keyspace_flush(http_context& ctx, std::unique_ptr<http::request> req)
static
future<json::json_return_type>
rest_decommission(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
rest_decommission(sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, std::unique_ptr<http::request> req) {
apilog.info("decommission");
return ss.local().decommission().then([] {
return ss.local().decommission(ssc).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
}
@@ -1782,7 +1782,7 @@ rest_bind(FuncType func, BindArgs&... args) {
return std::bind_front(func, std::ref(args)...);
}
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
ss::get_token_endpoint.set(r, rest_bind(rest_get_token_endpoint, ctx, ss));
ss::toppartitions_generic.set(r, rest_bind(rest_toppartitions_generic, ctx));
ss::get_release_version.set(r, rest_bind(rest_get_release_version, ss));
@@ -1799,7 +1799,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss));
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
ss::decommission.set(r, rest_bind(rest_decommission, ss));
ss::decommission.set(r, rest_bind(rest_decommission, ss, ssc));
ss::move.set(r, rest_bind(rest_move, ss));
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss));
@@ -2141,6 +2141,7 @@ void unset_snapshot(http_context& ctx, routes& r) {
ss::start_backup.unset(r);
cf::get_true_snapshots_size.unset(r);
cf::get_all_true_snapshots_size.unset(r);
ss::decommission.unset(r);
}
}

View File

@@ -66,7 +66,7 @@ struct scrub_info {
scrub_info parse_scrub_options(const http_context& ctx, std::unique_ptr<http::request> req);
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, service::raft_group0_client&);
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
void unset_storage_service(http_context& ctx, httpd::routes& r);
void set_sstables_loader(http_context& ctx, httpd::routes& r, sharded<sstables_loader>& sst_loader);
void unset_sstables_loader(http_context& ctx, httpd::routes& r);

View File

@@ -597,7 +597,6 @@ scylla_tests = set([
'test/boost/logalloc_standard_allocator_segment_pool_backend_test',
'test/boost/logalloc_test',
'test/boost/lru_string_map_test',
'test/boost/lru_test',
'test/boost/managed_bytes_test',
'test/boost/managed_vector_test',
'test/boost/map_difference_test',
@@ -1584,7 +1583,6 @@ pure_boost_tests = set([
'test/boost/like_matcher_test',
'test/boost/linearizing_input_stream_test',
'test/boost/lru_string_map_test',
'test/boost/lru_test',
'test/boost/map_difference_test',
'test/boost/nonwrapping_interval_test',
'test/boost/observable_test',

View File

@@ -214,7 +214,11 @@ void cache_tracker::clear() {
}
void cache_tracker::touch(rows_entry& e) {
_lru.touch(e);
// last dummy may not be linked if evicted
if (e.is_linked()) {
_lru.remove(e);
}
_lru.add(e);
}
void cache_tracker::insert(cache_entry& entry) {

View File

@@ -39,10 +39,19 @@ snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::stor
}
future<> snapshot_ctl::stop() {
co_await _ops.close();
co_await disable_all_operations();
co_await _task_manager_module->stop();
}
future<> snapshot_ctl::disable_all_operations() {
if (!_ops.is_closed()) {
if (_ops.get_count()) {
snap_log.info("Waiting for snapshot/backup tasks to finish");
}
co_await _ops.close();
}
}
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter) {
auto& ks = _db.local().find_keyspace(ks_name);
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name), filter = std::move(filter)] (auto& pair) {

View File

@@ -120,6 +120,8 @@ public:
future<int64_t> true_snapshots_size();
future<int64_t> true_snapshots_size(sstring ks, sstring cf);
future<> disable_all_operations();
private:
config _config;
sharded<replica::database>& _db;

View File

@@ -292,8 +292,8 @@ For example::
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
or columns provided in a definition of the index.
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
See :ref:`WHERE <where-clause>`.
For example::
@@ -301,10 +301,6 @@ For example::
WHERE user_id = 'user123'
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
The supported operations are equal relations (``=`` and ``IN``) with restrictions as in regular ``WHERE`` clauses. See :ref:`WHERE <where-clause>`.
Other filtering scenarios are currently not supported.
.. note::
Vector indexes are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled.

View File

@@ -1,111 +1,347 @@
# Introduction
# Prototype design: auditing all keyspaces and per-role auditing
Similar to the approach described in CASSANDRA-12151, we add the
concept of an audit specification. An audit has a target (syslog or a
table) and a set of events/actions that it wants recorded. We
introduce new CQL syntax for Scylla users to describe and manipulate
audit specifications.
## Summary
Prior art:
- Microsoft SQL Server [audit
description](https://docs.microsoft.com/en-us/sql/relational-databases/security/auditing/sql-server-audit-database-engine?view=sql-server-ver15)
- pgAudit [docs](https://github.com/pgaudit/pgaudit/blob/master/README.md)
- MySQL audit_log docs in
[MySQL](https://dev.mysql.com/doc/refman/8.0/en/audit-log.html) and
[Azure](https://docs.microsoft.com/en-us/azure/mysql/concepts-audit-logs)
- DynamoDB can [use CloudTrail](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/logging-using-cloudtrail.html) to log all events
Extend the existing `scylla.yaml`-driven audit subsystem with two focused capabilities:
# CQL extensions
1. allow auditing **all keyspaces** without enumerating them one by one
2. allow auditing only a configured set of **roles**
## Create an audit
The prototype should stay close to the current implementation in `audit/`:
```cql
CREATE AUDIT [IF NOT EXISTS] audit-name WITH TARGET { SYSLOG | table-name }
[ AND TRIGGER KEYSPACE IN (ks1, ks2, ks3) ]
[ AND TRIGGER TABLE IN (tbl1, tbl2, tbl3) ]
[ AND TRIGGER ROLE IN (usr1, usr2, usr3) ]
[ AND TRIGGER CATEGORY IN (cat1, cat2, cat3) ]
;
- keep the existing backends (`table`, `syslog`, or both)
- keep the existing category / keyspace / table filters
- preserve live updates for audit configuration
- avoid any schema change to `audit.audit_log`
This is intentionally a small extension of the current auditing model, not a redesign around new CQL statements such as `CREATE AUDIT`.
## Motivation
Today Scylla exposes three main audit selectors:
- `audit_categories`
- `audit_tables`
- `audit_keyspaces`
This leaves two operational gaps:
1. **Auditing all keyspaces is cumbersome.**
Large installations may create keyspaces dynamically, or manage many tenant keyspaces. Requiring operators to keep
`audit_keyspaces` synchronized with the full keyspace list is error-prone and defeats the point of cluster-wide auditing.
2. **Auditing is all-or-nothing with respect to users.**
Once a category/keyspace/table combination matches, any authenticated user generating that traffic is audited.
Operators want to narrow the scope to specific tenants, service accounts, or privileged roles.
These two additions also work well together: "audit all keyspaces, but only for selected roles" is a practical way to reduce
both audit volume and performance impact.
## Goals
- Add a way to express "all keyspaces" in the current configuration model.
- Add a new role filter that limits auditing to selected roles.
- Preserve backwards compatibility for existing configurations.
- Keep the evaluation cheap on the request path.
- Support live configuration updates, consistent with the existing audit options.
## Non-goals
- Introducing `CREATE AUDIT`, `ALTER AUDIT`, or other new CQL syntax.
- Adding per-role audit destinations.
- Adding different categories per role.
- Expanding role matching through the full granted-role graph in the prototype.
- Changing the on-disk audit table schema.
## Current behavior
At the moment, audit logging is controlled by:
- `audit`
- `audit_categories`
- `audit_tables`
- `audit_keyspaces`
The current decision rule in `audit::should_log()` is effectively:
```text
category matches
&& (
keyspace is listed in audit_keyspaces
|| table is listed in audit_tables
|| category in {AUTH, ADMIN, DCL}
)
```
From this point on, every database event that matches all present
triggers will be recorded in the target. When the target is a table,
it behaves like the [current
design](https://docs.scylladb.com/operating-scylla/security/auditing/#table-storage).
Observations:
The audit name must be different from all other audits, unless IF NOT
EXISTS precedes it, in which case the existing audit must be identical
to the new definition. Case sensitivity and length limit are the same
as for table names.
- `AUTH`, `ADMIN`, and `DCL` are already global once their category is enabled.
- `DDL`, `DML`, and `QUERY` need a matching keyspace or table.
- An empty `audit_keyspaces` means "audit no keyspaces", not "audit every keyspace".
- There is no role-based filter; the authenticated user is recorded in the log but is not part of the decision.
- The exact implementation to preserve is in `audit/audit.cc` (`should_log()`, `inspect()`, and `inspect_login()`).
A trigger kind (ie, `KEYSPACE`, `TABLE`, `ROLE`, or `CATEGORY`) can be
specified at most once.
## Proposed configuration
## Show an audit
### 1. Add `audit_all_keyspaces`
```cql
DESCRIBE AUDIT [audit-name ...];
Introduce a new live-update boolean option:
Examples:
```yaml
# Audit all keyspaces for matching categories
audit_all_keyspaces: true
# Audit all keyspaces for selected roles
audit_all_keyspaces: true
audit_roles: "alice,bob"
```
Prints definitions of all audits named herein. If no names are
provided, prints all audits.
Semantics:
## Delete an audit
- `audit_all_keyspaces: false` keeps the existing behavior.
- `audit_all_keyspaces: true` makes every keyspace match.
- `audit_keyspaces` keeps its existing meaning: an explicit list of keyspaces, or no keyspace-wide auditing when left empty.
- `audit_all_keyspaces: true` and a non-empty `audit_keyspaces` must be rejected as invalid configuration,
because the two options express overlapping scope in different ways.
- A dedicated boolean is preferable to overloading `audit_keyspaces`, because it avoids changing the meaning of existing configurations.
- This also keeps the behavior aligned with today's `audit_tables` handling, where leaving `audit_tables` empty does not introduce a new wildcard syntax.
```cql
DROP AUDIT audit-name;
### 2. Add `audit_roles`
Introduce a new live-update configuration option:
```yaml
audit_roles: "alice,bob,service_api"
```
Stops logging events specified by this audit. Doesn't impact the
already logged events. If the target is a table, it remains as it is.
Semantics:
## Alter an audit
- empty `audit_roles` means **no role filtering**, preserving today's behavior
- non-empty `audit_roles` means audit only requests whose effective logged username matches one of the configured roles
- matching is byte-for-byte exact, using the same role name that is already written to the audit record's `username` column / syslog field
- the prototype should compare against the post-authentication role name from the session and audit log,
with no additional case folding or role-graph expansion
```cql
ALTER AUDIT audit-name WITH {same syntax as CREATE}
Examples:
```yaml
# Audit all roles in a single keyspace (current behavior, made explicit)
audit_keyspaces: "ks1"
audit_roles: ""
# Audit two roles across all keyspaces
audit_all_keyspaces: true
audit_roles: "alice,bob"
# Audit a service role, but only for selected tables
audit_tables: "ks1.orders,ks1.payments"
audit_roles: "billing_service"
```
Any trigger provided will be updated (or newly created, if previously
absent). To drop a trigger, use `IN *`.
## Decision rule after the change
## Permissions
After the prototype, the rule becomes:
Only superusers can modify audits or turn them on and off.
```text
category matches
&& role matches
&& (
category in {AUTH, ADMIN, DCL}
|| audit_all_keyspaces
|| keyspace is listed in audit_keyspaces
|| table is listed in audit_tables
)
```
Only superusers can read tables that are audit targets; no user can
modify them. Only superusers can drop tables that are audit targets,
after the audit itself is dropped. If a superuser doesn't drop a
target table, it remains in existence indefinitely.
Where:
# Implementation
- `role matches` is always true when `audit_roles` is empty
- `audit_all_keyspaces` is true when the new boolean option is enabled
## Efficient trigger evaluation
For login auditing, the rule is simply:
```text
AUTH category enabled && role matches(login username)
```
## Implementation details
### Configuration parsing
Add a new config entry:
- `db::config::audit_all_keyspaces`
- `db::config::audit_roles`
It should mirror the existing audit selectors:
- `audit_all_keyspaces`: type `named_value<bool>`, liveness `LiveUpdate`, default `false`
- `audit_roles`: type `named_value<sstring>`, liveness `LiveUpdate`, default empty string
Parsing changes:
- keep `parse_audit_tables()` as-is
- keep `parse_audit_keyspaces()` semantics as-is
- add `parse_audit_roles()` that returns a set of role names
- normalize empty or whitespace-only keyspace lists to an empty configuration rather than treating them as real keyspace names
- add cross-field validation so `audit_all_keyspaces: true` cannot be combined with a non-empty
`audit_keyspaces`, both at startup and during live updates
To avoid re-parsing on every request, the `audit::audit` service should store:
```c++
namespace audit {
/// Stores triggers from an AUDIT statement.
class triggers {
// Use trie structures for speedy string lookup.
optional<trie> _ks_trigger, _tbl_trigger, _usr_trigger;
// A logical-AND filter.
optional<unsigned> _cat_trigger;
public:
/// True iff every non-null trigger matches the corresponding ainf element.
bool should_audit(const audit_info& ainf);
};
} // namespace audit
bool _audit_all_keyspaces;
std::set<sstring> _audited_keyspaces;
std::set<sstring> _audited_roles;
```
To prevent modification of target tables, `audit::inspect()` will
check the statement and throw if it is disallowed, similar to what
`check_access()` currently does.
Using a dedicated boolean keeps the hot-path check straightforward and avoids reinterpreting the existing
`_audited_keyspaces` selector.
## Persisting audit definitions
Using `std::set` for the explicit selectors keeps the prototype aligned with the current implementation and minimizes code churn.
If profiling later shows lookup cost matters here, the container choice can be revisited independently of the feature semantics.
Obviously, an audit definition must survive a server restart and stay
consistent among all nodes in a cluster. We'll accomplish both by
storing audits in a system table.
### Audit object changes
The current `audit_info` already carries:
- category
- keyspace
- table
- query text
The username is available separately from `service::query_state` and is already passed to storage helpers when an entry is written.
For the prototype there is no need to duplicate the username into `audit_info`.
Instead:
- change `should_log()` to take the effective username as an additional input
- change `should_log_login()` to check the username against `audit_roles`
- keep the storage helpers unchanged, because they already persist the username
- update the existing internal call sites in `inspect()` and `inspect_login()` to pass the username through
One possible interface shape is:
```c++
bool should_log(std::string_view username, const audit_info* info) const;
bool should_log_login(std::string_view username) const;
```
### Role semantics
For the prototype, "role" means the role name already associated with the current client session:
- successful authenticated sessions use the session's user name
- failed login events use the login name from the authentication attempt
- failed login events are still subject to `audit_roles`, matched against the attempted login name
This keeps the feature easy to explain and aligns the filter with what users already see in audit output.
The prototype should **not** try to expand inherited roles. If a user logs in as `alice` and inherits permissions from another role,
the audit filter still matches `alice`. This keeps the behavior deterministic and avoids expensive role graph lookups on the request path.
### Keyspace semantics
`audit_all_keyspaces: true` should affect any statement whose `audit_info` carries a keyspace name.
Important consequences:
- it makes `DDL` / `DML` / `QUERY` auditing effectively cluster-wide
- it does not change the existing global handling of `AUTH`, `ADMIN`, and `DCL`
- statements that naturally have no keyspace name continue to depend on their category-specific behavior
No extra schema or metadata scan is required: the request already carries the keyspace information needed for the decision.
## Backwards compatibility
This design keeps existing behavior intact:
- existing clusters that do not set `audit_roles` continue to audit all roles
- existing clusters that leave `audit_keyspaces` empty continue to audit no keyspaces
- existing explicit keyspace/table lists keep their current meaning
The feature is enabled only by a new explicit boolean, so existing `audit_keyspaces` values do not need to be reinterpreted.
The only newly-invalid combination is enabling `audit_all_keyspaces` while also listing explicit keyspaces.
## Operational considerations
### Performance and volume
`audit_all_keyspaces: true` can significantly increase audit volume, especially with `QUERY` and `DML`.
The intended mitigation is to combine it with:
- a narrow `audit_categories`
- a narrow `audit_roles`
That combination gives operators a simple and cheap filter model:
- first by category
- then by role
- then by keyspace/table scope
### Live updates
`audit_roles` should follow the same live-update behavior as the current audit filters.
Changing:
- `audit_roles`
- `audit_all_keyspaces`
- `audit_keyspaces`
- `audit_tables`
- `audit_categories`
should update the in-memory selectors on all shards without restarting the node.
### Prototype limitation
Because matching is done against the authenticated session role name, `audit_roles` cannot express "audit everyone who inherits role X".
Operators must list the concrete login roles they want to audit. This is a deliberate trade-off in the prototype to keep matching cheap
and avoid role graph lookups on every audited request.
Example: if `alice` inherits permissions from `admin_role`, configuring `audit_roles: "admin_role"` would not audit requests from
`alice`; to audit those requests, `alice` itself must be listed.
### Audit table schema
No schema change is needed. The audit table already includes `username`, which is sufficient for both storage and later analysis.
## Testing plan
The prototype should extend existing audit coverage rather than introduce a separate test framework.
### Parser / unit coverage
Add focused tests for:
- empty `audit_roles`
- specific `audit_roles`
- `audit_all_keyspaces: true`
- invalid mixed configuration: `audit_all_keyspaces: true` with non-empty `audit_keyspaces`
- empty or whitespace-only keyspace lists such as `",,,"` or `" "`, which should normalize to an empty configuration and therefore audit no keyspaces
- boolean config parsing for `audit_all_keyspaces`
### Behavioral coverage
Extend the existing audit tests in `test/cluster/dtest/audit_test.py` with scenarios such as:
1. `audit_all_keyspaces: true` audits statements in multiple keyspaces without listing them explicitly
2. `audit_roles: "alice"` logs requests from `alice` but not from `bob`
3. `audit_all_keyspaces: true` + `audit_roles: "alice"` only logs `alice`'s traffic cluster-wide
4. login auditing respects `audit_roles`
5. live-updating `audit_roles` changes behavior without restart
6. setting `audit_all_keyspaces: true` together with explicit `audit_keyspaces` is rejected with a clear error
## Future evolution
This prototype is deliberately small, but it fits a broader audit-spec design if we decide to revisit that later.
In a future CQL-driven design, these two additions map naturally to triggers such as:
- `TRIGGER KEYSPACE IN *`
- `TRIGGER ROLE IN (...)`
That means the prototype is not throwaway work: it improves the current operational model immediately while keeping a clean path
toward richer audit objects in the future.

View File

@@ -10,27 +10,7 @@ Cache is always paired with its underlying mutation source which it mirrors. Tha
Eviction is about removing parts of the data from memory and recording the fact that information about those parts is missing. Eviction doesn't change the set of writes represented by cache as part of its `mutation_source` interface.
The smallest object which can be evicted, called eviction unit, is currently a single row (`rows_entry`). Eviction units are managed by a W-TinyLFU policy owned by a `cache_tracker`. The W-TinyLFU policy determines eviction order. It is shared among many tables. Currently, there is one per `database`.
### W-TinyLFU Eviction Policy
The cache uses a W-TinyLFU (Window Tiny Least Frequently Used) eviction policy,
which combines recency and frequency information for better hit rates than plain LRU.
The policy organizes entries into three segments:
- **Window** (~1% of cache): A small LRU that admits all new entries. This allows
new entries to build up frequency information before competing for main cache space.
- **Probation** (~19% of cache): Part of the main SLRU cache. Entries from the window
compete with probation victims for admission using a TinyLFU frequency filter.
- **Protected** (~80% of cache): The other part of the main SLRU cache. Entries are
promoted here from probation when accessed again.
The TinyLFU frequency filter uses a Count-Min Sketch to compactly estimate access
frequency. When eviction is needed, the window victim competes with the probation
victim: the entry with higher estimated frequency survives in probation while the
other is evicted. The sketch is periodically aged (all counts halved) to adapt to
changing access patterns.
The smallest object which can be evicted, called eviction unit, is currently a single row (`rows_entry`). Eviction units are linked in an LRU owned by a `cache_tracker`. The LRU determines eviction order. The LRU is shared among many tables. Currently, there is one per `database`.
All `rows_entry` objects which are owned by a `cache_tracker` are assumed to be either contained in a cache (in some `row_cache::partitions_type`) or
be owned by a (detached) `partition_snapshot`. When the last row from a `partition_entry` is evicted, the containing `cache_entry` is evicted from the cache.

View File

@@ -2236,7 +2236,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
return m.start();
}).get();
api::set_server_storage_service(ctx, ss, group0_client).get();
api::set_server_storage_service(ctx, ss, snapshot_ctl, group0_client).get();
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
api::unset_server_storage_service(ctx).get();
});

View File

@@ -371,7 +371,7 @@ public:
}
void on_preemptive_aborted() {
if (_state != reader_permit::state::waiting_for_admission && _state != reader_permit::state::waiting_for_memory) {
if (_state != reader_permit::state::waiting_for_admission) {
on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state));
}
@@ -1533,19 +1533,24 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
// + permit.timeout() < db::no_timeout -- to avoid preemptively aborting reads without timeout.
// Useful is tests when _preemptive_abort_factor is set to 1.0
// to avoid additional sleeps to wait for the read to be shed.
const auto time_budget = permit.timeout() - permit.created();
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
if (remaining_time > db::timeout_clock::duration::zero() &&
permit.timeout() < db::no_timeout &&
remaining_time <= _preemptive_abort_factor() * time_budget) {
permit.on_preemptive_aborted();
using ms = std::chrono::milliseconds;
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
_name,
std::chrono::duration_cast<ms>(time_budget - remaining_time),
std::chrono::duration_cast<ms>(time_budget),
_preemptive_abort_factor());
continue;
//
// Only apply to permits waiting for admission -- permits waiting for memory are already
// executing reads and should not be preemptively aborted.
if (permit.get_state() == reader_permit::state::waiting_for_admission) {
const auto time_budget = permit.timeout() - permit.created();
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
if (remaining_time > db::timeout_clock::duration::zero() &&
permit.timeout() < db::no_timeout &&
remaining_time <= _preemptive_abort_factor() * time_budget) {
permit.on_preemptive_aborted();
using ms = std::chrono::milliseconds;
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
_name,
std::chrono::duration_cast<ms>(time_budget - remaining_time),
std::chrono::duration_cast<ms>(time_budget),
_preemptive_abort_factor());
continue;
}
}
if (permit.get_state() == reader_permit::state::waiting_for_memory) {

View File

@@ -2362,6 +2362,15 @@ static future<> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
}
}
static future<repair_rows_on_wire> clone_gently(const repair_rows_on_wire& rows) {
repair_rows_on_wire cloned;
for (const auto& row : rows) {
cloned.push_back(row);
co_await seastar::coroutine::maybe_yield();
}
co_return cloned;
}
static future<> repair_put_row_diff_with_rpc_stream_process_op(
sharded<repair_service>& repair,
locator::host_id from,
@@ -2388,7 +2397,9 @@ static future<> repair_put_row_diff_with_rpc_stream_process_op(
co_await rm->put_row_diff_handler(std::move(*fp));
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
} else {
co_await rm->put_row_diff_handler(*fp);
// Gently clone to avoid copy stall on destination shard
repair_rows_on_wire local_rows = co_await clone_gently(*fp);
co_await seastar::when_all_succeed(rm->put_row_diff_handler(std::move(local_rows)), utils::clear_gently(fp));
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
}
});

View File

@@ -2794,12 +2794,18 @@ future<> storage_service::raft_decommission() {
}
}
future<> storage_service::decommission() {
return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) {
return seastar::async([&ss] {
future<> storage_service::decommission(sharded<db::snapshot_ctl>& snapshot_ctl) {
return run_with_api_lock(sstring("decommission"), [&] (storage_service& ss) {
return seastar::async([&] {
if (ss._operation_mode != mode::NORMAL) {
throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode));
}
snapshot_ctl.invoke_on_all([](auto& sctl) {
return sctl.disable_all_operations();
}).get();
slogger.info("DECOMMISSIONING: disabled backup and snapshots");
ss.raft_decommission().get();
ss.stop_transport().get();

View File

@@ -77,6 +77,7 @@ namespace db {
class system_distributed_keyspace;
class system_keyspace;
class batchlog_manager;
class snapshot_ctl;
namespace view {
class view_builder;
class view_building_worker;
@@ -666,7 +667,7 @@ private:
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const schema_ptr& schema, const replica::column_family& cf, const partition_key& pk) const;
public:
future<> decommission();
future<> decommission(sharded<db::snapshot_ctl>&);
private:
future<> unbootstrap();

View File

@@ -471,14 +471,17 @@ def test_streams_operations(test_table_s, dynamodbstreams, metrics):
# to update latencies for one kind of operation (#17616, and compare #9406),
# and to do that checking that ..._count increases for that op is enough.
@contextmanager
def check_sets_latency(metrics, operation_names):
def check_sets_latency_by_metric(metrics, operation_names, metric_name):
the_metrics = get_metrics(metrics)
saved_latency_count = { x: get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': x}, the_metrics) for x in operation_names }
saved_latency_count = { x: get_metric(metrics, f'{metric_name}_count', {'op': x}, the_metrics) for x in operation_names }
yield
the_metrics = get_metrics(metrics)
for op in operation_names:
# The total "count" on all shards should strictly increase
assert saved_latency_count[op] < get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': op}, the_metrics)
assert saved_latency_count[op] < get_metric(metrics, f'{metric_name}_count', {'op': op}, the_metrics)
def check_sets_latency(metrics, operation_names):
return check_sets_latency_by_metric(metrics, operation_names, 'scylla_alternator_op_latency')
# Test latency metrics for PutItem, GetItem, DeleteItem, UpdateItem.
# We can't check what exactly the latency is - just that it gets updated.
@@ -494,6 +497,18 @@ def test_item_latency(test_table_s, metrics):
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
def test_item_latency_per_table(test_table_s, metrics):
with check_sets_latency_by_metric(metrics, ['DeleteItem', 'GetItem', 'PutItem', 'UpdateItem', 'BatchWriteItem', 'BatchGetItem'], 'scylla_alternator_table_op_latency'):
p = random_string()
test_table_s.put_item(Item={'p': p})
test_table_s.get_item(Key={'p': p})
test_table_s.delete_item(Key={'p': p})
test_table_s.update_item(Key={'p': p})
test_table_s.meta.client.batch_write_item(RequestItems = {
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
# Test latency metrics for GetRecords. Other Streams-related operations -
# ListStreams, DescribeStream, and GetShardIterator, have an operation
# count (tested above) but do NOT currently have a latency histogram.

View File

@@ -1,287 +0,0 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#define BOOST_TEST_MODULE lru
#include <boost/test/unit_test.hpp>
#include <vector>
#include <algorithm>
#include <memory>
#include "utils/count_min_sketch.hh"
#include "utils/lru.hh"
// A concrete evictable for testing.
struct test_evictable final : public evictable {
int id;
bool was_evicted = false;
explicit test_evictable(int id) : id(id) {}
void on_evicted() noexcept override {
was_evicted = true;
}
};
// ---------------------------------------------------------------------------
// Count-Min Sketch Tests
// ---------------------------------------------------------------------------
// Width = 2^test_sketch_width_log2 = 1024 counters per row.
static constexpr size_t test_sketch_width_log2 = 10;
BOOST_AUTO_TEST_CASE(test_count_min_sketch_basic) {
utils::count_min_sketch sketch(test_sketch_width_log2);
// An unseen key should have estimate 0.
BOOST_REQUIRE_EQUAL(sketch.estimate(42), 0);
sketch.increment(42);
BOOST_REQUIRE_EQUAL(sketch.estimate(42), 1);
sketch.increment(42);
sketch.increment(42);
BOOST_REQUIRE_EQUAL(sketch.estimate(42), 3);
// A different key should be independent.
BOOST_REQUIRE_EQUAL(sketch.estimate(100), 0);
sketch.increment(100);
BOOST_REQUIRE_EQUAL(sketch.estimate(100), 1);
BOOST_REQUIRE_EQUAL(sketch.estimate(42), 3);
}
BOOST_AUTO_TEST_CASE(test_count_min_sketch_max_counter) {
utils::count_min_sketch sketch(test_sketch_width_log2);
for (int i = 0; i < 20; ++i) {
sketch.increment(1);
}
// 4-bit counter caps at 15.
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 15);
}
BOOST_AUTO_TEST_CASE(test_count_min_sketch_decay) {
utils::count_min_sketch sketch(test_sketch_width_log2);
sketch.increment(1);
sketch.increment(1);
sketch.increment(1);
sketch.increment(1); // freq = 4
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 4);
sketch.decay(); // halve → 2
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 2);
sketch.decay(); // halve → 1
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 1);
sketch.decay(); // halve → 0
BOOST_REQUIRE_EQUAL(sketch.estimate(1), 0);
}
// ---------------------------------------------------------------------------
// W-TinyLFU LRU Tests
// ---------------------------------------------------------------------------
BOOST_AUTO_TEST_CASE(test_lru_add_and_evict) {
lru l;
test_evictable e1(1), e2(2), e3(3);
l.add(e1);
l.add(e2);
l.add(e3);
BOOST_REQUIRE(e1.is_linked());
BOOST_REQUIRE(e2.is_linked());
BOOST_REQUIRE(e3.is_linked());
// Evict removes at least one entry.
auto r = l.evict();
BOOST_REQUIRE(r == seastar::memory::reclaiming_result::reclaimed_something);
// At least one entry should have been evicted.
int evicted_count = (e1.was_evicted ? 1 : 0) + (e2.was_evicted ? 1 : 0) + (e3.was_evicted ? 1 : 0);
BOOST_REQUIRE_GE(evicted_count, 1);
// Clean up remaining linked entries.
if (e1.is_linked()) l.remove(e1);
if (e2.is_linked()) l.remove(e2);
if (e3.is_linked()) l.remove(e3);
}
BOOST_AUTO_TEST_CASE(test_lru_evict_empty) {
lru l;
auto r = l.evict();
BOOST_REQUIRE(r == seastar::memory::reclaiming_result::reclaimed_nothing);
}
BOOST_AUTO_TEST_CASE(test_lru_touch_keeps_entry_alive) {
lru l;
// Create entries with different access patterns.
test_evictable hot(1), cold1(2), cold2(3);
l.add(hot);
l.add(cold1);
l.add(cold2);
// Touch 'hot' many times to build frequency.
for (int i = 0; i < 10; ++i) {
l.touch(hot);
}
// Evict all - the hot entry may survive longer than cold entries.
l.evict();
l.evict();
// Hot entry should still be linked (survived eviction of cold entries).
BOOST_REQUIRE(hot.is_linked());
// Clean up.
l.remove(hot);
// cold entries may or may not still be linked, clean up if needed.
if (cold1.is_linked()) l.remove(cold1);
if (cold2.is_linked()) l.remove(cold2);
}
BOOST_AUTO_TEST_CASE(test_lru_evict_all) {
lru l;
test_evictable e1(1), e2(2), e3(3);
l.add(e1);
l.add(e2);
l.add(e3);
l.evict_all();
BOOST_REQUIRE(!e1.is_linked());
BOOST_REQUIRE(!e2.is_linked());
BOOST_REQUIRE(!e3.is_linked());
BOOST_REQUIRE(e1.was_evicted);
BOOST_REQUIRE(e2.was_evicted);
BOOST_REQUIRE(e3.was_evicted);
}
BOOST_AUTO_TEST_CASE(test_lru_remove) {
lru l;
test_evictable e1(1), e2(2), e3(3);
l.add(e1);
l.add(e2);
l.add(e3);
l.remove(e2);
BOOST_REQUIRE(!e2.is_linked());
BOOST_REQUIRE(!e2.was_evicted); // remove does not call on_evicted
l.evict_all();
BOOST_REQUIRE(e1.was_evicted);
BOOST_REQUIRE(e3.was_evicted);
}
BOOST_AUTO_TEST_CASE(test_lru_add_before) {
lru l;
test_evictable e1(1), e2(2), e3(3);
l.add(e1);
l.add(e2);
// Insert e3 before e2 so e3 is evicted before e2.
l.add_before(e2, e3);
BOOST_REQUIRE(e1.is_linked());
BOOST_REQUIRE(e2.is_linked());
BOOST_REQUIRE(e3.is_linked());
// Clean up.
l.evict_all();
}
BOOST_AUTO_TEST_CASE(test_lru_frequency_based_eviction) {
lru l;
// Create entries with different access patterns.
// Use a fixed-size array to avoid move construction issues.
static constexpr int N = 20;
std::unique_ptr<test_evictable> entries[N];
for (int i = 0; i < N; ++i) {
entries[i] = std::make_unique<test_evictable>(i);
}
for (int i = 0; i < N; ++i) {
l.add(*entries[i]);
}
// Touch entries 15-19 many times (they should be "hot").
for (int round = 0; round < 10; ++round) {
for (int i = 15; i < N; ++i) {
l.touch(*entries[i]);
}
}
// Evict half the entries.
for (int i = 0; i < 10; ++i) {
l.evict();
}
// Hot entries (15-19) should still be linked.
for (int i = 15; i < N; ++i) {
BOOST_REQUIRE_MESSAGE(entries[i]->is_linked(),
"Hot entry " << i << " should survive eviction");
}
// Clean up remaining entries.
for (int i = 0; i < N; ++i) {
if (entries[i]->is_linked()) {
l.remove(*entries[i]);
}
}
}
BOOST_AUTO_TEST_CASE(test_lru_touch_promotes_from_probation) {
lru l;
// Create entries.
static constexpr int N = 10;
std::unique_ptr<test_evictable> entries[N];
for (int i = 0; i < N; ++i) {
entries[i] = std::make_unique<test_evictable>(i);
}
for (int i = 0; i < N; ++i) {
l.add(*entries[i]);
}
// Evict and re-add some to force entries into probation via the eviction logic.
// The eviction drains excess from window to probation.
// After enough evictions, remaining entries should be in probation or protected.
// Touch entries 0-4 multiple times to build frequency.
for (int round = 0; round < 5; ++round) {
for (int i = 0; i < 5; ++i) {
l.touch(*entries[i]);
}
}
// Evict 5 entries - cold entries (5-9) should be evicted.
for (int i = 0; i < 5; ++i) {
l.evict();
}
// Entries 0-4 (frequently touched) should survive.
for (int i = 0; i < 5; ++i) {
BOOST_REQUIRE_MESSAGE(entries[i]->is_linked(),
"Frequently touched entry " << i << " should survive eviction");
}
// Clean up.
for (int i = 0; i < N; ++i) {
if (entries[i]->is_linked()) {
l.remove(*entries[i]);
}
}
}

View File

@@ -2409,6 +2409,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort)
}
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abort_preemptively_aborted_permit) {
simple_schema s;
const auto schema = s.schema();
const std::string test_name = get_name();
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
const auto serialize_multiplier = 2;
// Ensure permits are shed immediately during admission.
@@ -2421,27 +2425,24 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abort_preemptively_ab
// Set a ridiculously long timeout to ensure permit will not be rejected due to timeout
auto timeout = db::timeout_clock::now() + 60min;
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, timeout, {}).get();
auto permit1 = semaphore.obtain_permit(schema, test_name, 1024, db::no_timeout, {}).get();
auto units1 = permit1.request_memory(2024).get();
auto permit2_units1 = permit2.request_memory(1024).get();
auto permit1_units = permit1.request_memory(8 * 1024).get();
reader_permit_opt permit2_holder;
auto permit2_fut = semaphore.with_permit(schema, test_name.c_str(), 1024, timeout, {}, permit2_holder, [] (reader_permit) {
BOOST_FAIL("unexpected call to with permit lambda");
return make_ready_future<>();
});
// permit1 is now the blessed one
// Triggers maybe_admit_waiters()
units1.reset_to_zero();
auto permit2_units2_fut = permit2.request_memory(1024);
BOOST_REQUIRE(!permit2_units2_fut.available());
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 1);
BOOST_REQUIRE(eventually_true([&] { return permit2_fut.failed(); }));
BOOST_REQUIRE_THROW(permit2_fut.get(), named_semaphore_aborted);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().total_reads_shed_due_to_overload, 1);
permit1_units.reset_to_zero();
const auto futures_failed = eventually_true([&] { return permit2_units2_fut.failed(); });
BOOST_CHECK(futures_failed);
BOOST_CHECK_EQUAL(semaphore.get_stats().total_reads_shed_due_to_overload, 1);
simple_schema ss;
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(ss.schema(), permit2));
BOOST_REQUIRE_THROW(permit2_units2_fut.get(), named_semaphore_aborted);
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(schema, *permit2_holder));
BOOST_CHECK(!irh);
}
/// Test that if no count resources are currently used, a single permit is always admitted regardless of available memory.
@@ -2555,4 +2556,43 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_release_base_resource
}
}
// Reproducer for https://scylladb.atlassian.net/browse/SCYLLADB-1016
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_preemptive_abort_requested_memory_leak) {
const ssize_t memory = 1024;
const uint32_t serialize_limit_multiplier = 2;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(),
2, // count
memory,
100, // max queue length
utils::updateable_value(serialize_limit_multiplier),
utils::updateable_value(std::numeric_limits<uint32_t>::max()), // kill limit multiplier
utils::updateable_value<uint32_t>(1), // cpu concurrency
utils::updateable_value<float>(1.0f)); // preemptive abort factor
auto stop_sem = deferred_stop(semaphore);
auto permit1 = semaphore.obtain_permit(nullptr, "permit1", memory/2, db::no_timeout, {}).get();
reader_permit_opt permit2 = semaphore.obtain_permit(nullptr, "permit2", memory/2, db::timeout_clock::now() + 60s, {}).get();
auto units1 = permit1.request_memory(memory * serialize_limit_multiplier).get();
auto mem_fut = permit2->request_memory(1024);
BOOST_REQUIRE(!mem_fut.available());
// Triggers maybe_admit_waiters()
units1.reset_to_zero();
// Consume mem_fut to properly account for the 1024 bytes consumed by
// on_granted_memory(). In debug mode, the .then() continuation that creates
// the resource_units may be deferred due to yielding, so we must .get() it
// to ensure the resource_units is created and can be properly destroyed.
{ auto u = mem_fut.get(); }
// on_granted_memory() consumes stale _requested_memory (1024) + 512,
// but resource_units only tracks 512 — the difference leaks.
{ auto u = permit2->request_memory(512).get(); }
// Shouldn't fail if SCYLLADB-1016 is fixed.
permit2 = {};
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -61,7 +61,7 @@ async def test_startup_no_auth_response(manager: ManagerClient, build_mode):
session = c.connect()
logging.info("Performing SELECT(*) FROM system.clients")
res = session.execute("SELECT COUNT(*) FROM system.clients WHERE connection_stage = 'AUTHENTICATING' ALLOW FILTERING;")
count = res[0][0]
count = res.one()[0]
logging.info(f"Observed {count} AUTHENTICATING connections...")
if count >= num_connections/2:
connections_observed = True

View File

@@ -11,7 +11,7 @@ import pytest
import time
import random
from test.pylib.manager_client import ManagerClient
from test.pylib.manager_client import ManagerClient, ServerInfo
from test.cluster.object_store.conftest import format_tuples
from test.cluster.util import wait_for_cql_and_get_hosts, get_replication, new_test_keyspace
from test.pylib.rest_client import read_barrier
@@ -19,6 +19,7 @@ from test.pylib.util import unique_name, wait_all
from cassandra.cluster import ConsistencyLevel
from collections import defaultdict
from test.pylib.util import wait_for
from test.pylib.rest_client import HTTPError
import statistics
logger = logging.getLogger(__name__)
@@ -174,9 +175,8 @@ async def test_backup_endpoint_config_is_live_updateable(manager: ManagerClient,
assert status is not None
assert status['state'] == 'done'
async def do_test_backup_abort(manager: ManagerClient, object_storage,
breakpoint_name, min_files, max_files = None):
async def do_test_backup_helper(manager: ManagerClient, object_storage,
breakpoint_name, handler, num_servers: int = 1):
'''helper for backup abort testing'''
objconf = object_storage.create_endpoint_conf()
@@ -186,10 +186,9 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
'task_ttl_in_seconds': 300
}
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
server = await manager.server_add(config=cfg, cmdline=cmd)
server = (await manager.servers_add(num_servers, config=cfg, cmdline=cmd))[0]
ks, cf = create_ks_and_cf(manager.get_cql())
snap_name, files = await take_snapshot_on_one_server(ks, server, manager, logger)
assert len(files) > 1
workdir = await manager.server_get_workdir(server.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
@@ -206,26 +205,35 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
print(f'Started task {tid}, aborting it early')
await log.wait_for(breakpoint_name + ': waiting', from_mark=mark)
await manager.api.abort_task(server.ip_addr, tid)
await manager.api.message_injection(server.ip_addr, breakpoint_name)
status = await manager.api.wait_task(server.ip_addr, tid)
print(f'Status: {status}')
assert (status is not None) and (status['state'] == 'failed')
assert "seastar::abort_requested_exception (abort requested)" in status['error']
await handler(server, prefix, files, tid)
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
uploaded_count = 0
for f in files:
in_backup = f'{prefix}/{f}' in objects
print(f'Check {f} is in backup: {in_backup}')
if in_backup:
uploaded_count += 1
# Note: since s3 client is abortable and run async, we might fail even the first file
# regardless of if we set the abort status before or after the upload is initiated.
# Parallelism is a pain.
assert min_files <= uploaded_count < len(files)
assert max_files is None or uploaded_count < max_files
async def do_test_backup_abort(manager: ManagerClient, object_storage,
breakpoint_name, min_files, max_files = None):
'''helper for backup abort testing'''
async def abort_and_check(server, prefix, files, tid):
assert len(files) > 1
await manager.api.abort_task(server.ip_addr, tid)
await manager.api.message_injection(server.ip_addr, breakpoint_name)
status = await manager.api.wait_task(server.ip_addr, tid)
print(f'Status: {status}')
assert (status is not None) and (status['state'] == 'failed')
assert "seastar::abort_requested_exception (abort requested)" in status['error']
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
uploaded_count = 0
for f in files:
in_backup = f'{prefix}/{f}' in objects
print(f'Check {f} is in backup: {in_backup}')
if in_backup:
uploaded_count += 1
# Note: since s3 client is abortable and run async, we might fail even the first file
# regardless of if we set the abort status before or after the upload is initiated.
# Parallelism is a pain.
assert min_files <= uploaded_count < len(files)
assert max_files is None or uploaded_count < max_files
await do_test_backup_helper(manager, object_storage, breakpoint_name, abort_and_check)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@@ -456,7 +464,8 @@ class topo:
self.racks = racks
self.dcs = dcs
async def create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, object_storage=None):
async def create_cluster(topology, manager, logger, object_storage=None):
rf_rack_valid_keyspaces = (topology.rf <= topology.racks)
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks, rf_rack_valid_keyspaces: {rf_rack_valid_keyspaces}')
cfg = {'task_ttl_in_seconds': 300, 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces}
@@ -679,19 +688,19 @@ class SSTablesOnObjectStorage:
@pytest.mark.asyncio
@pytest.mark.parametrize("topology_rf_validity", [
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
@pytest.mark.parametrize("topology", [
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
])
async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerClient, object_storage, topology_rf_validity):
async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerClient, object_storage, topology):
'''Check that restoring of a cluster with stream scopes works'''
await do_test_streaming_scopes(build_mode, manager, topology_rf_validity, SSTablesOnObjectStorage(object_storage))
await do_test_streaming_scopes(build_mode, manager, topology, SSTablesOnObjectStorage(object_storage))
async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity, sstables_storage):
async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topology, sstables_storage):
'''
This test creates a cluster specified by the topology parameter above,
configurable number of nodes, tacks, datacenters, and replication factor.
@@ -707,9 +716,7 @@ async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topo
This stage parses the logs and checks that the data was streamed to nodes within the configured scope.
'''
topology, rf_rack_valid_keyspaces = topology_rf_validity
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, sstables_storage.object_storage)
servers, host_ids = await create_cluster(topology, manager, logger, sstables_storage.object_storage)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
@@ -858,7 +865,8 @@ async def test_backup_broken_streaming(manager: ManagerClient, s3_storage):
res = cql.execute(f"SELECT COUNT(*) FROM {keyspace}.{table} BYPASS CACHE USING TIMEOUT 600s;")
assert res[0].count == expected_rows, f"number of rows after restore is incorrect: {res[0].count}"
row = res.one()
assert row.count == expected_rows, f"number of rows after restore is incorrect: {row.count}"
log = await manager.server_open_log(server.server_id)
await log.wait_for("fully contained SSTables to local node from object storage", timeout=10)
# just make sure we had partially contained sstables as well
@@ -878,7 +886,7 @@ async def test_restore_primary_replica_same_domain(manager: ManagerClient, objec
ks = 'ks'
cf = 'cf'
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
servers, host_ids = await create_cluster(topology, manager, logger, object_storage)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
@@ -940,7 +948,7 @@ async def test_restore_primary_replica_different_domain(manager: ManagerClient,
ks = 'ks'
cf = 'cf'
servers, host_ids = await create_cluster(topology, True if domain == 'rack' else False, manager, logger, object_storage)
servers, host_ids = await create_cluster(topology, manager, logger, object_storage)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
@@ -971,3 +979,39 @@ async def test_restore_primary_replica_different_domain(manager: ManagerClient,
streamed_to = set([ r[1].group(1) for r in res ])
logger.info(f'{s.ip_addr} {host_ids[s.server_id]} streamed to {streamed_to}')
assert len(streamed_to) == 2
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_decommision_waits_for_backup(manager: ManagerClient, object_storage):
'''check that backing up a snapshot for a keyspace blocks decommission'''
async def decommission_and_check(server: ServerInfo, prefix: str, files, tid):
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
async def finish_backup():
# wait for snapshot to stop on waiting for backup
await log.wait_for("Waiting for snapshot/backup tasks to finish", from_mark=mark)
mark2 = await log.mark()
# let the backup run and finish
await manager.api.message_injection(server.ip_addr, "backup_task_pre_upload")
status = await manager.api.wait_task(server.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
uploaded_count = 0
# all files should be uploaded. note: can be zero due to two nodes
for f in files:
in_backup = f'{prefix}/{f}' in objects
print(f'Check {f} is in backup: {in_backup}')
if in_backup:
uploaded_count += 1
assert uploaded_count == len(files)
# Now wait for decommission to finish
await log.wait_for("DECOMMISSIONING: disabled backup and snapshots", from_mark=mark2)
await asyncio.gather(manager.decommission_node(server.server_id), finish_backup())
await do_test_backup_helper(manager, object_storage, "backup_task_pre_upload", decommission_and_check, 2)

View File

@@ -44,7 +44,6 @@ async def test_gossiper_empty_self_id_on_shadow_round(manager: ManagerClient):
logging.info("Starting cluster normally")
node1 = await manager.server_add(cmdline=cmdline, start=False, config=cfg)
manager.server_add(cmdline=cmdline, start=False)
node1_log = await manager.server_open_log(node1.server_id)
node2 = await manager.server_add(cmdline=cmdline, start=False, seeds=[node1.ip_addr])
node2_log = await manager.server_open_log(node2.server_id)

View File

@@ -7,6 +7,7 @@ import logging
import pytest
import asyncio
from test.pylib.internal_types import ServerNum
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, InjectionHandler, read_barrier
from test.cluster.util import create_new_test_keyspace
@@ -19,6 +20,20 @@ def fixture_raft_op_timeout(build_mode):
return 10000 if build_mode == 'debug' else 1000
async def update_group0_raft_op_timeout(server_id: ServerNum, manager: ManagerClient, timeout: int) -> None:
logger.info(f"Updating group0_raft_op_timeout_in_ms on server {server_id} to {timeout}")
running_ids = [srv.server_id for srv in await manager.running_servers()]
if server_id in running_ids:
# If the node is alive, server_update_config only sends the SIGHUP signal to the Scylla process, so awaiting it
# doesn't guarantee that the new config file is active. Work around this by looking at the logs.
log_file = await manager.server_open_log(server_id)
mark = await log_file.mark()
await manager.server_update_config(server_id, 'group0_raft_op_timeout_in_ms', timeout)
await log_file.wait_for("completed re-reading configuration file", from_mark=mark, timeout=60)
else:
await manager.server_update_config(server_id, 'group0_raft_op_timeout_in_ms', timeout)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@pytest.mark.skip_mode(mode='debug', reason='aarch64/debug is unpredictably slow', platform_key='aarch64')
@@ -41,7 +56,6 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
config = {
'direct_failure_detector_ping_timeout_in_ms': 300,
'group0_raft_op_timeout_in_ms': raft_op_timeout,
'error_injections_at_startup': [
{
'name': 'raft-group-registry-fd-threshold-in-ms',
@@ -63,6 +77,10 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
manager.server_stop_gracefully(servers[3].server_id),
manager.server_stop_gracefully(servers[4].server_id))
# Do it here to prevent unexpected timeouts before quorum loss.
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, raft_op_timeout)
for srv in servers[:2]))
logger.info("starting a sixth node with no quorum")
await manager.server_add(expected_error="raft operation \\[read_barrier\\] timed out, there is no raft quorum",
timeout=60)
@@ -75,7 +93,6 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
@pytest.mark.skip_mode(mode='debug', reason='aarch64/debug is unpredictably slow', platform_key='aarch64')
async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_timeout: int) -> None:
config = {
'group0_raft_op_timeout_in_ms': raft_op_timeout,
'error_injections_at_startup': [
{
'name': 'raft-group-registry-fd-threshold-in-ms',
@@ -106,6 +123,9 @@ async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_time
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
manager.server_stop_gracefully(servers[2].server_id))
# Do it here to prevent unexpected timeouts before quorum loss.
await update_group0_raft_op_timeout(servers[0].server_id, manager, raft_op_timeout)
logger.info("release join-node-before-add-entry injection")
await injection_handler.message()
@@ -125,7 +145,6 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
logger.info("adding a fourth node")
servers += [await manager.server_add(config={
'group0_raft_op_timeout_in_ms': raft_op_timeout,
'error_injections_at_startup': [
{
'name': 'raft-group-registry-fd-threshold-in-ms',
@@ -152,6 +171,9 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
manager.server_stop_gracefully(servers[2].server_id))
# Do it here to prevent unexpected timeouts before quorum loss.
await update_group0_raft_op_timeout(servers[3].server_id, manager, raft_op_timeout)
logger.info("release join-node-response_handler-before-read-barrier injection")
injection_handler = InjectionHandler(manager.api,
'join-node-response_handler-before-read-barrier',
@@ -168,7 +190,6 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
async def test_cannot_run_operations(manager: ManagerClient, raft_op_timeout: int) -> None:
logger.info("starting a first node (the leader)")
servers = [await manager.server_add(config={
'group0_raft_op_timeout_in_ms': raft_op_timeout,
'error_injections_at_startup': [
{
'name': 'raft-group-registry-fd-threshold-in-ms',
@@ -188,6 +209,9 @@ async def test_cannot_run_operations(manager: ManagerClient, raft_op_timeout: in
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
manager.server_stop_gracefully(servers[2].server_id))
# Do it here to prevent unexpected timeouts before quorum loss.
await update_group0_raft_op_timeout(servers[0].server_id, manager, raft_op_timeout)
logger.info("attempting removenode for the second node")
await manager.remove_node(servers[0].server_id, servers[1].server_id,
expected_error="raft operation [read_barrier] timed out, there is no raft quorum",
@@ -231,9 +255,7 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
await asyncio.gather(*(manager.server_stop(srv.server_id) for srv in servers))
# This ensures the read barriers below fail quickly without group 0 quorum.
logger.info(f"Decreasing group0_raft_op_timeout_in_ms on {servers}")
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', raft_op_timeout)
for srv in servers))
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, raft_op_timeout) for srv in servers))
logger.info(f"Restarting {servers[:2]} with no group 0 quorum")
for idx, srv in enumerate(servers[:2]):
@@ -245,8 +267,7 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
# Increase the timeout back to 300s to ensure the new group 0 leader is elected before the first read barrier below
# times out.
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', 300000)
for srv in servers))
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, 300000) for srv in servers))
logger.info(f"Restarting {servers[2:]} with group 0 quorum")
for srv in servers[2:]:

View File

@@ -75,16 +75,16 @@ class SSTablesOnLocalStorage:
await asyncio.gather(*(self.refresh_one(manager, s, ks, cf, sstables, scope, primary_replica_only) for s, sstables in sstables_per_server.items()))
@pytest.mark.asyncio
@pytest.mark.parametrize("topology_rf_validity", [
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
@pytest.mark.parametrize("topology", [
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
])
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity):
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology):
'''Check that refreshing of a cluster with stream scopes works'''
await do_test_streaming_scopes(build_mode, manager, topology_rf_validity, SSTablesOnLocalStorage())
await do_test_streaming_scopes(build_mode, manager, topology, SSTablesOnLocalStorage())
async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
@@ -94,7 +94,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1)
servers, host_ids = await create_cluster(topology, True, manager, logger)
servers, host_ids = await create_cluster(topology, manager, logger)
cql = manager.get_cql()

View File

@@ -71,7 +71,7 @@ async def test_snapshot_on_all_nodes(manager: ManagerClient):
"""
topology = topo(rf = 3, nodes = 3, racks = 3, dcs = 1)
servers, _ = await create_cluster(topology, True, manager, logger)
servers, _ = await create_cluster(topology, manager, logger)
snapshot_name = unique_name('snap_')

View File

@@ -182,7 +182,7 @@ async def test_tombstone_gc_insert_flush(manager: ManagerClient):
async def test_tablet_manual_repair_all_tokens(manager: ManagerClient):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)
token = "all"
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
map1 = await load_tablet_repair_time(cql, hosts[0:1], table_id)
await guarantee_repair_time_next_second()

View File

@@ -1324,7 +1324,7 @@ async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, n
return servers
class TestContext:
class Context:
def __init__(self, ks: str, table: str, rf: int, initial_tablets: int, num_keys: int):
self.ks = ks
self.table = table
@@ -1347,7 +1347,7 @@ async def create_and_populate_table(manager: ManagerClient, rf: int = 3, initial
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': {initial_tablets}}}")
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int PRIMARY KEY, c int)")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.{table} (pk, c) VALUES ({k}, 1);") for k in range(num_keys)])
yield TestContext(ks, table, rf, initial_tablets, num_keys)
yield Context(ks, table, rf, initial_tablets, num_keys)
finally:
await cql.run_async(f"DROP KEYSPACE {ks}")

View File

@@ -53,6 +53,11 @@ filterwarnings =
ignore::DeprecationWarning:importlib._bootstrap
ignore::DeprecationWarning:botocore
ignore::DeprecationWarning:pytest_elk_reporter
ignore::sqlalchemy.exc.MovedIn20Warning:kmip.pie.sqltypes
ignore::cryptography.utils.CryptographyDeprecationWarning:kmip
ignore:TypeDecorator .* will not produce a cache key:sqlalchemy.exc.SAWarning:kmip
ignore:Exception in thread[\s\S]*kmip[\s\S]*shutdown:pytest.PytestUnhandledThreadExceptionWarning
tmp_path_retention_count = 1
tmp_path_retention_policy = failed

View File

@@ -1,108 +0,0 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <cstdint>
#include <cstddef>
#include <vector>
#include <algorithm>
namespace utils {
/// A Count-Min Sketch with 4-bit counters for frequency estimation.
///
/// Used by the W-TinyLFU cache admission policy to estimate access frequency.
/// Each counter is 4 bits (max value 15), and counters are packed 16 per
/// uint64_t word. The sketch uses 4 independent hash functions (rows) and
/// returns the minimum count across all rows for frequency estimation.
class count_min_sketch {
static constexpr size_t depth = 4;
static constexpr uint64_t reset_mask = 0x7777777777777777ULL;
// Hash seeds from splitmix64 sequence, chosen for low correlation between rows.
static constexpr uint64_t seeds[depth] = {
0x9e3779b97f4a7c15ULL,
0xbf58476d1ce4e5b9ULL,
0x94d049bb133111ebULL,
0xd6e8feb86659fd93ULL,
};
std::vector<uint64_t> _table;
size_t _width;
size_t _width_mask;
size_t _words_per_row;
static uint64_t mix(uint64_t key, uint64_t seed) noexcept {
uint64_t h = key * seed;
h ^= h >> 32;
h *= 0xd6e8feb86659fd93ULL;
h ^= h >> 32;
return h;
}
size_t counter_index(size_t row, uint64_t key) const noexcept {
return mix(key, seeds[row]) & _width_mask;
}
static uint8_t get_counter(uint64_t word, size_t pos) noexcept {
return (word >> (pos * 4)) & 0x0FULL;
}
size_t word_index(size_t row, size_t col) const noexcept {
return row * _words_per_row + col / 16;
}
public:
/// Construct a sketch with the given number of counters per row.
/// \param width_log2 Log base 2 of the number of counters per row.
/// Total memory is approximately depth * 2^width_log2 / 2 bytes.
explicit count_min_sketch(size_t width_log2 = 16)
: _width(size_t(1) << width_log2)
, _width_mask(_width - 1)
, _words_per_row(_width / 16)
{
_table.resize(depth * _words_per_row, 0);
}
void increment(uint64_t key) noexcept {
for (size_t row = 0; row < depth; ++row) {
size_t col = counter_index(row, key);
size_t wi = word_index(row, col);
size_t pos = col & 15;
uint8_t val = get_counter(_table[wi], pos);
if (val < 15) {
_table[wi] += (1ULL << (pos * 4));
}
}
}
uint8_t estimate(uint64_t key) const noexcept {
uint8_t min_val = 15;
for (size_t row = 0; row < depth; ++row) {
size_t col = counter_index(row, key);
size_t wi = word_index(row, col);
size_t pos = col & 15;
min_val = std::min(min_val, get_counter(_table[wi], pos));
}
return min_val;
}
/// Halve all counters (frequency decay / aging).
/// This is NOT a full clear — it preserves relative frequency ordering
/// while allowing the sketch to adapt to changing access patterns.
void decay() noexcept {
for (auto& word : _table) {
word = (word >> 1) & reset_mask;
}
}
size_t width() const noexcept { return _width; }
};
} // namespace utils

View File

@@ -9,18 +9,8 @@
#pragma once
#include "utils/assert.hh"
#include "utils/count_min_sketch.hh"
#include <boost/intrusive/list.hpp>
#include <seastar/core/memory.hh>
#include <algorithm>
// Identifies which W-TinyLFU segment an evictable belongs to.
enum class lru_segment : uint8_t {
none = 0,
window = 1,
probation = 2,
protected_ = 3,
};
class evictable {
friend class lru;
@@ -42,11 +32,6 @@ protected:
static_assert(std::is_nothrow_constructible_v<lru_link_type, lru_link_type&&>);
private:
lru_link_type _lru_link;
// Stable key for frequency estimation in the Count-Min Sketch.
// Assigned when the entry is first added to the LRU, preserved across LSA
// compaction moves (which change the object address but preserve members).
uint32_t _frequency_hash = 0;
lru_segment _segment = lru_segment::none;
protected:
// Prevent destruction via evictable pointer. LRU is not aware of allocation strategy.
// Prevent destruction of a linked evictable. While we could unlink the evictable here
@@ -69,8 +54,6 @@ public:
void swap(evictable& o) noexcept {
_lru_link.swap_nodes(o._lru_link);
std::swap(_frequency_hash, o._frequency_hash);
std::swap(_segment, o._segment);
}
virtual bool is_index() const noexcept {
@@ -93,27 +76,13 @@ class index_evictable : public evictable {
}
};
// Implements W-TinyLFU cache replacement for row cache and sstable index cache.
//
// W-TinyLFU uses a small admission window backed by an LRU and a main cache
// organized as a Segmented LRU (SLRU) with probation and protected segments.
// Admission to the main cache is controlled by a TinyLFU frequency filter
// implemented via a Count-Min Sketch.
//
// New entries enter the window. When eviction is needed, the window victim
// competes with the probation victim: the entry with higher estimated
// frequency survives in probation while the other is evicted.
// Touching an entry in probation promotes it to the protected segment.
// When the protected segment exceeds its target size, the least-recently-used
// protected entry is demoted back to probation.
// Implements LRU cache replacement for row cache and sstable index cache.
class lru {
private:
using lru_type = boost::intrusive::list<evictable,
boost::intrusive::member_hook<evictable, evictable::lru_link_type, &evictable::_lru_link>,
boost::intrusive::constant_time_size<false>>; // we need this to have bi::auto_unlink on hooks.
lru_type _window;
lru_type _probation;
lru_type _protected;
lru_type _list;
// See the comment to index_evictable.
using index_lru_type = boost::intrusive::list<index_evictable,
@@ -123,225 +92,24 @@ private:
using reclaiming_result = seastar::memory::reclaiming_result;
static constexpr size_t sketch_width_log2 = 16;
static constexpr size_t sketch_width = size_t(1) << sketch_width_log2;
static constexpr size_t sample_threshold = sketch_width * 10;
// Window segment target: ~1% of total cache entries.
static constexpr size_t window_percent = 1;
// Protected segment target: ~80% of total cache entries.
static constexpr size_t protected_percent = 80;
utils::count_min_sketch _sketch{sketch_width_log2};
size_t _window_size = 0;
size_t _probation_size = 0;
size_t _protected_size = 0;
size_t _sample_count = 0;
// Monotonic counter for assigning stable frequency hash keys.
// Using object addresses as sketch keys is incorrect because LSA
// relocates objects during compaction, changing their address.
uint32_t _next_hash = 0;
size_t total_size() const noexcept {
return _window_size + _probation_size + _protected_size;
}
size_t max_window_size() const noexcept {
return std::max(size_t(1), total_size() * window_percent / 100);
}
size_t max_protected_size() const noexcept {
return total_size() * protected_percent / 100;
}
static uint64_t entry_key(const evictable& e) noexcept {
return e._frequency_hash;
}
void assign_frequency_hash(evictable& e) noexcept {
// Only assign a new hash if the entry doesn't have one yet.
// Re-added entries (after remove()) keep their existing hash
// to preserve frequency tracking across remove/add cycles.
if (e._frequency_hash == 0) {
// Skip 0 on wrap-around to keep it as the "unassigned" sentinel.
if (++_next_hash == 0) {
++_next_hash;
}
e._frequency_hash = _next_hash;
}
}
void record_access(const evictable& e) noexcept {
_sketch.increment(entry_key(e));
if (++_sample_count >= sample_threshold) {
_sketch.decay();
_sample_count /= 2;
}
}
lru_type& segment_list(lru_segment seg) noexcept {
switch (seg) {
case lru_segment::none:
SCYLLA_ASSERT(false && "segment_list called with none");
__builtin_unreachable();
case lru_segment::window: return _window;
case lru_segment::probation: return _probation;
case lru_segment::protected_: return _protected;
}
__builtin_unreachable();
}
void increment_size(lru_segment seg) noexcept {
switch (seg) {
case lru_segment::none: break;
case lru_segment::window: ++_window_size; break;
case lru_segment::probation: ++_probation_size; break;
case lru_segment::protected_: ++_protected_size; break;
}
}
void decrement_size(lru_segment seg) noexcept {
switch (seg) {
case lru_segment::none: break;
case lru_segment::window: --_window_size; break;
case lru_segment::probation: --_probation_size; break;
case lru_segment::protected_: --_protected_size; break;
}
}
void remove_from_segment(evictable& e) noexcept {
auto& list = segment_list(e._segment);
list.erase(list.iterator_to(e));
decrement_size(e._segment);
e._segment = lru_segment::none;
}
void add_to_segment(evictable& e, lru_segment seg) noexcept {
e._segment = seg;
segment_list(seg).push_back(e);
increment_size(seg);
}
// Move excess protected entries to probation.
// Bounded to avoid reactor stalls when the protected segment is
// significantly oversized (e.g. after many promotions without eviction).
static constexpr size_t max_rebalance_per_call = 128;
void rebalance_protected() noexcept {
size_t max_prot = max_protected_size();
size_t moved = 0;
while (_protected_size > max_prot && !_protected.empty() && moved < max_rebalance_per_call) {
evictable& victim = _protected.front();
remove_from_segment(victim);
add_to_segment(victim, lru_segment::probation);
++moved;
}
}
// Evicts a single element using W-TinyLFU policy.
template <bool Shallow = false>
reclaiming_result do_evict(bool should_evict_index) noexcept {
// Index eviction path: evict the least recently used index entry.
if (should_evict_index && !_index_list.empty()) {
evictable& e = _index_list.front();
remove(e);
if constexpr (!Shallow) {
e.on_evicted();
} else {
e.on_evicted_shallow();
}
return reclaiming_result::reclaimed_something;
}
if (_window.empty() && _probation.empty() && _protected.empty()) {
return reclaiming_result::reclaimed_nothing;
}
rebalance_protected();
// Drain excess from window using TinyLFU admission.
while (_window_size > max_window_size() && !_window.empty()) {
evictable& w_victim = _window.front();
if (!_probation.empty()) {
// Competition: window victim vs. probation victim.
evictable& p_victim = _probation.front();
uint8_t w_freq = _sketch.estimate(entry_key(w_victim));
uint8_t p_freq = _sketch.estimate(entry_key(p_victim));
if (w_freq >= p_freq) {
// Admit window victim to probation; evict probation victim.
remove_from_segment(w_victim);
add_to_segment(w_victim, lru_segment::probation);
remove(p_victim);
if constexpr (!Shallow) {
p_victim.on_evicted();
} else {
p_victim.on_evicted_shallow();
}
} else {
// Reject window victim.
remove(w_victim);
if constexpr (!Shallow) {
w_victim.on_evicted();
} else {
w_victim.on_evicted_shallow();
}
}
return reclaiming_result::reclaimed_something;
}
// Probation is empty: move window victim to probation and retry.
remove_from_segment(w_victim);
add_to_segment(w_victim, lru_segment::probation);
}
// Window is within target. Evict from probation, then window, then protected.
evictable* victim = nullptr;
if (!_probation.empty()) {
victim = &_probation.front();
} else if (!_window.empty()) {
victim = &_window.front();
} else if (!_protected.empty()) {
victim = &_protected.front();
} else {
return reclaiming_result::reclaimed_nothing;
}
remove(*victim);
if constexpr (!Shallow) {
victim->on_evicted();
} else {
victim->on_evicted_shallow();
}
return reclaiming_result::reclaimed_something;
}
public:
~lru() {
auto drain = [this](lru_type& list) {
while (!list.empty()) {
evictable& e = list.front();
remove(e);
e.on_evicted();
}
};
drain(_window);
drain(_probation);
drain(_protected);
while (!_list.empty()) {
evictable& e = _list.front();
remove(e);
e.on_evicted();
}
}
void remove(evictable& e) noexcept {
auto& list = segment_list(e._segment);
list.erase(list.iterator_to(e));
decrement_size(e._segment);
e._segment = lru_segment::none;
_list.erase(_list.iterator_to(e));
if (e.is_index()) {
_index_list.erase(_index_list.iterator_to(static_cast<index_evictable&>(e)));
}
}
void add(evictable& e) noexcept {
assign_frequency_hash(e);
record_access(e);
add_to_segment(e, lru_segment::window);
_list.push_back(e);
if (e.is_index()) {
_index_list.push_back(static_cast<index_evictable&>(e));
}
@@ -349,52 +117,36 @@ public:
// Like add(e) but makes sure that e is evicted right before "more_recent" in the absence of later touches.
void add_before(evictable& more_recent, evictable& e) noexcept {
assign_frequency_hash(e);
record_access(e);
lru_segment seg = more_recent._segment;
auto& list = segment_list(seg);
list.insert(list.iterator_to(more_recent), e);
e._segment = seg;
increment_size(seg);
_list.insert(_list.iterator_to(more_recent), e);
}
// Handles access to an entry:
// - In window: moves to back of window.
// - In probation: promotes to protected.
// - In protected: moves to back of protected.
// - Not linked: adds to window.
void touch(evictable& e) noexcept {
record_access(e);
switch (e._segment) {
case lru_segment::none:
assign_frequency_hash(e);
add_to_segment(e, lru_segment::window);
break;
case lru_segment::window:
_window.erase(_window.iterator_to(e));
_window.push_back(e);
break;
case lru_segment::probation:
_probation.erase(_probation.iterator_to(e));
--_probation_size;
e._segment = lru_segment::protected_;
_protected.push_back(e);
++_protected_size;
break;
case lru_segment::protected_:
_protected.erase(_protected.iterator_to(e));
_protected.push_back(e);
break;
}
remove(e);
add(e);
}
// Evicts a single element using the W-TinyLFU policy.
// Evicts a single element from the LRU
template <bool Shallow = false>
reclaiming_result do_evict(bool should_evict_index) noexcept {
if (_list.empty()) {
return reclaiming_result::reclaimed_nothing;
}
evictable& e = (should_evict_index && !_index_list.empty()) ? _index_list.front() : _list.front();
remove(e);
if constexpr (!Shallow) {
e.on_evicted();
} else {
e.on_evicted_shallow();
}
return reclaiming_result::reclaimed_something;
}
// Evicts a single element from the LRU.
reclaiming_result evict(bool should_evict_index = false) noexcept {
return do_evict<false>(should_evict_index);
}
// Evicts a single element using the W-TinyLFU policy.
// Evicts a single element from the LRU.
// Will call on_evicted_shallow() instead of on_evicted().
reclaiming_result evict_shallow() noexcept {
return do_evict<true>(false);