Compare commits

...

24 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
173fb1e6d3 Clarify audit all-keyspaces exclusivity
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 15:32:11 +00:00
copilot-swe-agent[bot]
e252bb1550 Revise audit all-keyspaces design
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 15:04:40 +00:00
copilot-swe-agent[bot]
5713b5efd1 Finalize audit design doc clarifications
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:49:48 +00:00
copilot-swe-agent[bot]
979ec5ada8 Polish audit design doc review feedback
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:47:26 +00:00
copilot-swe-agent[bot]
67503a350b Refine audit prototype design details
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:45:38 +00:00
copilot-swe-agent[bot]
a90490c3cf Clarify audit design doc semantics
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:44:28 +00:00
copilot-swe-agent[bot]
6f957ea4e0 Add audit prototype design doc
Co-authored-by: ptrsmrn <124208650+ptrsmrn@users.noreply.github.com>
2026-03-17 14:43:44 +00:00
copilot-swe-agent[bot]
f6605f7b66 Initial plan 2026-03-17 14:37:56 +00:00
Asias He
6cb263bab0 repair: Prevent CPU stall during cross-shard row copy and destruction
When handling `repair_stream_cmd::end_of_current_rows`, passing the
foreign list directly to `put_row_diff_handler` triggered a massive
synchronous deep copy on the destination shard. Additionally, destroying
the list triggered a synchronous deallocation on the source shard. This
blocked the reactor and triggered the CPU stall detector.

This commit fixes the issue by introducing `clone_gently()` to copy the
list elements one by one, and leveraging the existing
`utils::clear_gently()` to destroy them. Both utilize
`seastar::coroutine::maybe_yield()` to allow the reactor to breathe
during large cross-shard transfers and cleanups.

Fixes SCYLLADB-403

Closes scylladb/scylladb#28979
2026-03-17 11:05:15 +02:00
Botond Dénes
035aa90d4b Merge 'Alternator: add per-table batch latency metrics and test coverage' from Amnon Heiman
This series fixes a metrics visibility gap in Alternator and adds regression coverage.

Until now, BatchGetItem and BatchWriteItem updated global latency histograms but did not consistently update per-table latency histograms. As a result, table-level latency dashboards could miss batch traffic.

It updates the batch read/write paths to compute request duration once and record it in both global and per-table latency metrics.

Add the missing tests, including a metric-agnostic helper and a dedicated per-table latency test that verifies latency counters increase for item and batch operations.

This change is metrics-only (no API/behavior change for requests) and improves observability consistency between global and per-table views.

Fixes #28721

**We assume the alternator per-table metrics exist, but the batch ones are not updated**

Closes scylladb/scylladb#28732

* github.com:scylladb/scylladb:
  test(alternator): add per-table latency coverage for item and batch ops
  alternator: track per-table latency for batch get/write operations
2026-03-16 17:18:00 +02:00
Michał Hudobski
40d180a7ef docs: update vector search filtering to reflect primary key support only
Remove outdated references to filtering on columns provided in the
index definition, and remove the note about equal relations (= and IN)
being the only supported operations. Vector search filtering currently
supports WHERE clauses on primary key columns only.

Closes scylladb/scylladb#28949
2026-03-16 17:16:16 +02:00
Botond Dénes
9de8d6798e Merge 'reader_concurrency_semaphore: skip preemptive abort for permits waiting for memory' from Łukasz Paszkowski
Permits in the `waiting_for_memory` state represent already-executing reads that are blocked on memory allocation. Preemptively aborting them is wasteful -- these reads have already consumed resources and made progress, so they should be allowed to complete.

Restrict the preemptive abort check in maybe_admit_waiters() to only apply to permits in the `waiting_for_admission` state, and tighten the state validation in `on_preemptive_aborted()` accordingly.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1016

Backport not needed. The commit introducing replica load shedding is not part of 2026.1

Closes scylladb/scylladb#29025

* github.com:scylladb/scylladb:
  reader_concurrency_semaphore: skip preemptive abort for permits waiting for memory
  reader_concurrency_semaphore_test: detect memory leak on preemptive abort of waiting_for_memory permit
2026-03-16 17:14:25 +02:00
Calle Wilund
a5df2e79a7 storage_service: Wait for snapshot/backup before decommission
Fixes: SCYLLADB-244

Disables snapshot control such that any active ops finish/fail
before proceeding with decommission.
Note: snapshot control provided as argument, not member ref
due to storage_service being used from both main and cql_test_env.
(The latter has no snapshot_ctl to provide).

Could do the snapshot lockout on API level, but want to do
pre-checks before this.

Note: this just disables backup/snapshot fully. Could re-enable
after decommission, but this seems somewhat pointless.

v2:
* Add log message to snapshot shutdown
* Make test use log waiting instead of timeouts

Closes scylladb/scylladb#28980
2026-03-16 17:12:57 +02:00
bitpathfinder
85d5073234 test: Fix non-awaited coroutine in test_gossiper_empty_self_id_on_shadow_round
The line with the error was not actually needed and has therefore been removed.

Fixes: SCYLLADB-906

Closes scylladb/scylladb#28884
2026-03-16 17:07:36 +02:00
Botond Dénes
3e4e0c57b8 Merge 'Relax rf-rack-valid-keyspace option in backup/restore tests' from Pavel Emelyanov
Some tests, when create a cluster, configure nodes with the rf-rack-valid option, because sometimes they want to have it OFF. For that the option is explicitly carried around, but the cluster creating helper can guess this option itself -- out of the provided topology and replication factor.

Removing this option simplifies the code and (which a nicer outcome) the test "signature" that's used e.g. in command-line to run a specific test.

Improving tests, not backporting

Closes scylladb/scylladb#28860

* github.com:scylladb/scylladb:
  test: Relax topology_rf_validity parameter for some tests
  test: Auto detect rf-rack-valid option in create_cluster()
2026-03-16 17:06:46 +02:00
Patryk Jędrzejczak
526e5986fe test: test_raft_no_quorum: decrease group0_raft_op_timeout_in_ms after quorum loss
`test_raft_no_quorum.py::test_cannot_add_new_node` is currently flaky in dev
mode. The bootstrap of the first node can fail due to `add_entry()` timing
out (with the 1s timeout set by the test case).

Other test cases in this test file could fail in the same way as well, so we
need a general fix. We don't want to increase the timeout in dev mode, as it
would slow down the test. The solution is to keep the timeout unchanged, but
set it only after quorum is lost. This prevents unexpected timeouts of group0
operations with almost no impact on the test running time.

A note about the new `update_group0_raft_op_timeout` function: waiting for
the log seems to be necessary only for
`test_quorum_lost_during_node_join_response_handler`, but let's do it
for all test cases just in case (including `test_can_restart` that shouldn't
be flaky currently).

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-913

Closes scylladb/scylladb#28998
2026-03-16 16:58:15 +02:00
Dani Tweig
bc0952781a Update Jira sync calling workflow to consolidated view
Replaced multiple per-action workflow jobs with a single consolidated
call to main_pr_events_jira_sync.yml. Added 'edited' event trigger.
This makes CI actions in PRs more readable and workflow execution faster.

Fixes:PM-253

Closes scylladb/scylladb#29042
2026-03-16 08:25:32 +02:00
Artsiom Mishuta
755d528135 test.py: fix warnings
changes in this commit:
1)rename class from 'TestContext' to  'Context' so pytest will not consider this class as a test

2)extend pytest filterwarnings list to ignore warnings from external libs

3) use datetime.datetime.now(datetime.UTC) unstead  datetime.datetime.utcnow()

4) use  ResultSet.one() instead  ResultSet[0]

Fixes SCYLLADB-904
Fixes SCYLLADB-908
Related SCYLLADB-902

Closes scylladb/scylladb#28956
2026-03-15 12:00:10 +02:00
Pavel Emelyanov
d544d8602d test: Relax topology_rf_validity parameter for some tests
Tests that call create_cluster() helper no longer need to carry the
rf-validity parameter. This simplifies the code and test signature.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2026-03-13 14:30:32 +03:00
Pavel Emelyanov
313985fed7 test: Auto detect rf-rack-valid option in create_cluster()
The helper accepts its as boolean argument, but it can easily estimate
one from the provided topology.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2026-03-13 14:30:32 +03:00
Łukasz Paszkowski
4c4d043a3b reader_concurrency_semaphore: skip preemptive abort for permits waiting for memory
Permits in the `waiting_for_memory` state represent already-executing
reads that are blocked on memory allocation. Preemptively aborting
them is wasteful -- these reads have already consumed resources and
made progress, so they should be allowed to complete.

Restrict the preemptive abort check in maybe_admit_waiters() to only
apply to permits in the `waiting_for_admission` state, and tighten
the state validation in `on_preemptive_aborted()` accordingly.

Adjust the following tests:
+ test_reader_concurrency_semaphore_abort_preemptively_aborted_permit
  no longer relies on requesting memory
+ test_reader_concurrency_semaphore_preemptive_abort_requested_memory_leak
  adjusted to the fix

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1016
2026-03-13 09:50:05 +01:00
Andrzej Jackowski
3b9cd52a95 reader_concurrency_semaphore_test: detect memory leak on preemptive abort of waiting_for_memory permit
A permit in `waiting_for_memory` state can be preemptively aborted by
maybe_admit_waiters(). This is wrong: such permits have already been
admitted and are actively processing a read — they are merely blocked
waiting for memory under serialize-limit pressure.

When `on_preemptive_aborted()` fires on a `waiting_for_memory` permit,
it does not clear `_requested_memory`. A subsequent `request_memory()`
call accumulatesa on top of the stale value, causing `on_granted_memory()`
to consume more than resource_units tracks.

This commit adds a test that confirms that scenario by counting
internal_errors.
2026-03-12 17:09:34 +01:00
Amnon Heiman
aca5284b13 test(alternator): add per-table latency coverage for item and batch ops
Add missing tests for per-table Alternator latency metrics to ensure recent
per-table latency accounting is actually validated.

Changes in this patch:

Refactor latency assertion helper into check_sets_latency_by_metric(),
parameterized by metric name.
Keep existing behavior by implementing check_sets_latency() as a wrapper
over scylla_alternator_op_latency.
Add test_item_latency_per_table() to verify
scylla_alternator_table_op_latency_count increases for:
PutItem, GetItem, DeleteItem, UpdateItem, BatchWriteItem,
and BatchGetItem.
This closes a test gap where only global latency metrics were checked, while
per-table latency metrics were not covered.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
2026-02-25 20:51:18 +02:00
Amnon Heiman
29e0b4e08c alternator: track per-table latency for batch get/write operations
Batch operations were updating only global latency histograms, which left
table-level latency metrics incomplete.

This change computes request duration once at the end of each operation and
reuses it to update both global and per-table latency stats:

Latencies are stored per table used,

This aligns batch read/write metric behavior with other operations and improves
per-table observability.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
2026-02-25 20:51:18 +02:00
26 changed files with 607 additions and 230 deletions

View File

@@ -1,8 +1,8 @@
name: Sync Jira Based on PR Events
name: Sync Jira Based on PR Events
on:
pull_request_target:
types: [opened, ready_for_review, review_requested, labeled, unlabeled, closed]
types: [opened, edited, ready_for_review, review_requested, labeled, unlabeled, closed]
permissions:
contents: read
@@ -10,32 +10,9 @@ permissions:
issues: write
jobs:
jira-sync-pr-opened:
if: github.event.action == 'opened'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_opened.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-in-review:
if: github.event.action == 'ready_for_review' || github.event.action == 'review_requested'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-add-label:
if: github.event.action == 'labeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_add_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-remove-label:
if: github.event.action == 'unlabeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_remove_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-pr-closed:
if: github.event.action == 'closed'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_closed.yml@main
jira-sync:
uses: scylladb/github-automation/.github/workflows/main_pr_events_jira_sync.yml@main
with:
caller_action: ${{ github.event.action }}
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -3463,7 +3463,11 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
if (should_add_wcu) {
rjson::add(ret, "ConsumedCapacity", std::move(consumed_capacity));
}
_stats.api_operations.batch_write_item_latency.mark(std::chrono::steady_clock::now() - start_time);
auto duration = std::chrono::steady_clock::now() - start_time;
_stats.api_operations.batch_write_item_latency.mark(duration);
for (const auto& w : per_table_wcu) {
w.first->api_operations.batch_write_item_latency.mark(duration);
}
co_return rjson::print(std::move(ret));
}
@@ -4974,7 +4978,12 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
if (!some_succeeded && eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
}
_stats.api_operations.batch_get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
auto duration = std::chrono::steady_clock::now() - start_time;
_stats.api_operations.batch_get_item_latency.mark(duration);
for (const table_requests& rs : requests) {
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->api_operations.batch_get_item_latency.mark(duration);
}
if (is_big(response)) {
co_return make_streamed(std::move(response));
} else {

View File

@@ -122,9 +122,9 @@ future<> unset_thrift_controller(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_thrift_controller(ctx, r); });
}
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
return ctx.http_server.set_routes([&ctx, &ss, &group0_client] (routes& r) {
set_storage_service(ctx, r, ss, group0_client);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
return ctx.http_server.set_routes([&ctx, &ss, &ssc, &group0_client] (routes& r) {
set_storage_service(ctx, r, ss, ssc, group0_client);
});
}

View File

@@ -98,7 +98,7 @@ future<> set_server_config(http_context& ctx, db::config& cfg);
future<> unset_server_config(http_context& ctx);
future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snitch);
future<> unset_server_snitch(http_context& ctx);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
future<> unset_server_storage_service(http_context& ctx);
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
future<> unset_server_client_routes(http_context& ctx);

View File

@@ -835,9 +835,9 @@ rest_force_keyspace_flush(http_context& ctx, std::unique_ptr<http::request> req)
static
future<json::json_return_type>
rest_decommission(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
rest_decommission(sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, std::unique_ptr<http::request> req) {
apilog.info("decommission");
return ss.local().decommission().then([] {
return ss.local().decommission(ssc).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
}
@@ -1782,7 +1782,7 @@ rest_bind(FuncType func, BindArgs&... args) {
return std::bind_front(func, std::ref(args)...);
}
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
ss::get_token_endpoint.set(r, rest_bind(rest_get_token_endpoint, ctx, ss));
ss::toppartitions_generic.set(r, rest_bind(rest_toppartitions_generic, ctx));
ss::get_release_version.set(r, rest_bind(rest_get_release_version, ss));
@@ -1799,7 +1799,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss));
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
ss::decommission.set(r, rest_bind(rest_decommission, ss));
ss::decommission.set(r, rest_bind(rest_decommission, ss, ssc));
ss::move.set(r, rest_bind(rest_move, ss));
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss));
@@ -2141,6 +2141,7 @@ void unset_snapshot(http_context& ctx, routes& r) {
ss::start_backup.unset(r);
cf::get_true_snapshots_size.unset(r);
cf::get_all_true_snapshots_size.unset(r);
ss::decommission.unset(r);
}
}

View File

@@ -66,7 +66,7 @@ struct scrub_info {
scrub_info parse_scrub_options(const http_context& ctx, std::unique_ptr<http::request> req);
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, service::raft_group0_client&);
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
void unset_storage_service(http_context& ctx, httpd::routes& r);
void set_sstables_loader(http_context& ctx, httpd::routes& r, sharded<sstables_loader>& sst_loader);
void unset_sstables_loader(http_context& ctx, httpd::routes& r);

View File

@@ -39,10 +39,19 @@ snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::stor
}
future<> snapshot_ctl::stop() {
co_await _ops.close();
co_await disable_all_operations();
co_await _task_manager_module->stop();
}
future<> snapshot_ctl::disable_all_operations() {
if (!_ops.is_closed()) {
if (_ops.get_count()) {
snap_log.info("Waiting for snapshot/backup tasks to finish");
}
co_await _ops.close();
}
}
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter) {
auto& ks = _db.local().find_keyspace(ks_name);
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name), filter = std::move(filter)] (auto& pair) {

View File

@@ -120,6 +120,8 @@ public:
future<int64_t> true_snapshots_size();
future<int64_t> true_snapshots_size(sstring ks, sstring cf);
future<> disable_all_operations();
private:
config _config;
sharded<replica::database>& _db;

View File

@@ -292,8 +292,8 @@ For example::
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
or columns provided in a definition of the index.
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
See :ref:`WHERE <where-clause>`.
For example::
@@ -301,10 +301,6 @@ For example::
WHERE user_id = 'user123'
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
The supported operations are equal relations (``=`` and ``IN``) with restrictions as in regular ``WHERE`` clauses. See :ref:`WHERE <where-clause>`.
Other filtering scenarios are currently not supported.
.. note::
Vector indexes are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled.

View File

@@ -1,111 +1,347 @@
# Introduction
# Prototype design: auditing all keyspaces and per-role auditing
Similar to the approach described in CASSANDRA-12151, we add the
concept of an audit specification. An audit has a target (syslog or a
table) and a set of events/actions that it wants recorded. We
introduce new CQL syntax for Scylla users to describe and manipulate
audit specifications.
## Summary
Prior art:
- Microsoft SQL Server [audit
description](https://docs.microsoft.com/en-us/sql/relational-databases/security/auditing/sql-server-audit-database-engine?view=sql-server-ver15)
- pgAudit [docs](https://github.com/pgaudit/pgaudit/blob/master/README.md)
- MySQL audit_log docs in
[MySQL](https://dev.mysql.com/doc/refman/8.0/en/audit-log.html) and
[Azure](https://docs.microsoft.com/en-us/azure/mysql/concepts-audit-logs)
- DynamoDB can [use CloudTrail](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/logging-using-cloudtrail.html) to log all events
Extend the existing `scylla.yaml`-driven audit subsystem with two focused capabilities:
# CQL extensions
1. allow auditing **all keyspaces** without enumerating them one by one
2. allow auditing only a configured set of **roles**
## Create an audit
The prototype should stay close to the current implementation in `audit/`:
```cql
CREATE AUDIT [IF NOT EXISTS] audit-name WITH TARGET { SYSLOG | table-name }
[ AND TRIGGER KEYSPACE IN (ks1, ks2, ks3) ]
[ AND TRIGGER TABLE IN (tbl1, tbl2, tbl3) ]
[ AND TRIGGER ROLE IN (usr1, usr2, usr3) ]
[ AND TRIGGER CATEGORY IN (cat1, cat2, cat3) ]
;
- keep the existing backends (`table`, `syslog`, or both)
- keep the existing category / keyspace / table filters
- preserve live updates for audit configuration
- avoid any schema change to `audit.audit_log`
This is intentionally a small extension of the current auditing model, not a redesign around new CQL statements such as `CREATE AUDIT`.
## Motivation
Today Scylla exposes three main audit selectors:
- `audit_categories`
- `audit_tables`
- `audit_keyspaces`
This leaves two operational gaps:
1. **Auditing all keyspaces is cumbersome.**
Large installations may create keyspaces dynamically, or manage many tenant keyspaces. Requiring operators to keep
`audit_keyspaces` synchronized with the full keyspace list is error-prone and defeats the point of cluster-wide auditing.
2. **Auditing is all-or-nothing with respect to users.**
Once a category/keyspace/table combination matches, any authenticated user generating that traffic is audited.
Operators want to narrow the scope to specific tenants, service accounts, or privileged roles.
These two additions also work well together: "audit all keyspaces, but only for selected roles" is a practical way to reduce
both audit volume and performance impact.
## Goals
- Add a way to express "all keyspaces" in the current configuration model.
- Add a new role filter that limits auditing to selected roles.
- Preserve backwards compatibility for existing configurations.
- Keep the evaluation cheap on the request path.
- Support live configuration updates, consistent with the existing audit options.
## Non-goals
- Introducing `CREATE AUDIT`, `ALTER AUDIT`, or other new CQL syntax.
- Adding per-role audit destinations.
- Adding different categories per role.
- Expanding role matching through the full granted-role graph in the prototype.
- Changing the on-disk audit table schema.
## Current behavior
At the moment, audit logging is controlled by:
- `audit`
- `audit_categories`
- `audit_tables`
- `audit_keyspaces`
The current decision rule in `audit::should_log()` is effectively:
```text
category matches
&& (
keyspace is listed in audit_keyspaces
|| table is listed in audit_tables
|| category in {AUTH, ADMIN, DCL}
)
```
From this point on, every database event that matches all present
triggers will be recorded in the target. When the target is a table,
it behaves like the [current
design](https://docs.scylladb.com/operating-scylla/security/auditing/#table-storage).
Observations:
The audit name must be different from all other audits, unless IF NOT
EXISTS precedes it, in which case the existing audit must be identical
to the new definition. Case sensitivity and length limit are the same
as for table names.
- `AUTH`, `ADMIN`, and `DCL` are already global once their category is enabled.
- `DDL`, `DML`, and `QUERY` need a matching keyspace or table.
- An empty `audit_keyspaces` means "audit no keyspaces", not "audit every keyspace".
- There is no role-based filter; the authenticated user is recorded in the log but is not part of the decision.
- The exact implementation to preserve is in `audit/audit.cc` (`should_log()`, `inspect()`, and `inspect_login()`).
A trigger kind (ie, `KEYSPACE`, `TABLE`, `ROLE`, or `CATEGORY`) can be
specified at most once.
## Proposed configuration
## Show an audit
### 1. Add `audit_all_keyspaces`
```cql
DESCRIBE AUDIT [audit-name ...];
Introduce a new live-update boolean option:
Examples:
```yaml
# Audit all keyspaces for matching categories
audit_all_keyspaces: true
# Audit all keyspaces for selected roles
audit_all_keyspaces: true
audit_roles: "alice,bob"
```
Prints definitions of all audits named herein. If no names are
provided, prints all audits.
Semantics:
## Delete an audit
- `audit_all_keyspaces: false` keeps the existing behavior.
- `audit_all_keyspaces: true` makes every keyspace match.
- `audit_keyspaces` keeps its existing meaning: an explicit list of keyspaces, or no keyspace-wide auditing when left empty.
- `audit_all_keyspaces: true` and a non-empty `audit_keyspaces` must be rejected as invalid configuration,
because the two options express overlapping scope in different ways.
- A dedicated boolean is preferable to overloading `audit_keyspaces`, because it avoids changing the meaning of existing configurations.
- This also keeps the behavior aligned with today's `audit_tables` handling, where leaving `audit_tables` empty does not introduce a new wildcard syntax.
```cql
DROP AUDIT audit-name;
### 2. Add `audit_roles`
Introduce a new live-update configuration option:
```yaml
audit_roles: "alice,bob,service_api"
```
Stops logging events specified by this audit. Doesn't impact the
already logged events. If the target is a table, it remains as it is.
Semantics:
## Alter an audit
- empty `audit_roles` means **no role filtering**, preserving today's behavior
- non-empty `audit_roles` means audit only requests whose effective logged username matches one of the configured roles
- matching is byte-for-byte exact, using the same role name that is already written to the audit record's `username` column / syslog field
- the prototype should compare against the post-authentication role name from the session and audit log,
with no additional case folding or role-graph expansion
```cql
ALTER AUDIT audit-name WITH {same syntax as CREATE}
Examples:
```yaml
# Audit all roles in a single keyspace (current behavior, made explicit)
audit_keyspaces: "ks1"
audit_roles: ""
# Audit two roles across all keyspaces
audit_all_keyspaces: true
audit_roles: "alice,bob"
# Audit a service role, but only for selected tables
audit_tables: "ks1.orders,ks1.payments"
audit_roles: "billing_service"
```
Any trigger provided will be updated (or newly created, if previously
absent). To drop a trigger, use `IN *`.
## Decision rule after the change
## Permissions
After the prototype, the rule becomes:
Only superusers can modify audits or turn them on and off.
```text
category matches
&& role matches
&& (
category in {AUTH, ADMIN, DCL}
|| audit_all_keyspaces
|| keyspace is listed in audit_keyspaces
|| table is listed in audit_tables
)
```
Only superusers can read tables that are audit targets; no user can
modify them. Only superusers can drop tables that are audit targets,
after the audit itself is dropped. If a superuser doesn't drop a
target table, it remains in existence indefinitely.
Where:
# Implementation
- `role matches` is always true when `audit_roles` is empty
- `audit_all_keyspaces` is true when the new boolean option is enabled
## Efficient trigger evaluation
For login auditing, the rule is simply:
```text
AUTH category enabled && role matches(login username)
```
## Implementation details
### Configuration parsing
Add a new config entry:
- `db::config::audit_all_keyspaces`
- `db::config::audit_roles`
It should mirror the existing audit selectors:
- `audit_all_keyspaces`: type `named_value<bool>`, liveness `LiveUpdate`, default `false`
- `audit_roles`: type `named_value<sstring>`, liveness `LiveUpdate`, default empty string
Parsing changes:
- keep `parse_audit_tables()` as-is
- keep `parse_audit_keyspaces()` semantics as-is
- add `parse_audit_roles()` that returns a set of role names
- normalize empty or whitespace-only keyspace lists to an empty configuration rather than treating them as real keyspace names
- add cross-field validation so `audit_all_keyspaces: true` cannot be combined with a non-empty
`audit_keyspaces`, both at startup and during live updates
To avoid re-parsing on every request, the `audit::audit` service should store:
```c++
namespace audit {
/// Stores triggers from an AUDIT statement.
class triggers {
// Use trie structures for speedy string lookup.
optional<trie> _ks_trigger, _tbl_trigger, _usr_trigger;
// A logical-AND filter.
optional<unsigned> _cat_trigger;
public:
/// True iff every non-null trigger matches the corresponding ainf element.
bool should_audit(const audit_info& ainf);
};
} // namespace audit
bool _audit_all_keyspaces;
std::set<sstring> _audited_keyspaces;
std::set<sstring> _audited_roles;
```
To prevent modification of target tables, `audit::inspect()` will
check the statement and throw if it is disallowed, similar to what
`check_access()` currently does.
Using a dedicated boolean keeps the hot-path check straightforward and avoids reinterpreting the existing
`_audited_keyspaces` selector.
## Persisting audit definitions
Using `std::set` for the explicit selectors keeps the prototype aligned with the current implementation and minimizes code churn.
If profiling later shows lookup cost matters here, the container choice can be revisited independently of the feature semantics.
Obviously, an audit definition must survive a server restart and stay
consistent among all nodes in a cluster. We'll accomplish both by
storing audits in a system table.
### Audit object changes
The current `audit_info` already carries:
- category
- keyspace
- table
- query text
The username is available separately from `service::query_state` and is already passed to storage helpers when an entry is written.
For the prototype there is no need to duplicate the username into `audit_info`.
Instead:
- change `should_log()` to take the effective username as an additional input
- change `should_log_login()` to check the username against `audit_roles`
- keep the storage helpers unchanged, because they already persist the username
- update the existing internal call sites in `inspect()` and `inspect_login()` to pass the username through
One possible interface shape is:
```c++
bool should_log(std::string_view username, const audit_info* info) const;
bool should_log_login(std::string_view username) const;
```
### Role semantics
For the prototype, "role" means the role name already associated with the current client session:
- successful authenticated sessions use the session's user name
- failed login events use the login name from the authentication attempt
- failed login events are still subject to `audit_roles`, matched against the attempted login name
This keeps the feature easy to explain and aligns the filter with what users already see in audit output.
The prototype should **not** try to expand inherited roles. If a user logs in as `alice` and inherits permissions from another role,
the audit filter still matches `alice`. This keeps the behavior deterministic and avoids expensive role graph lookups on the request path.
### Keyspace semantics
`audit_all_keyspaces: true` should affect any statement whose `audit_info` carries a keyspace name.
Important consequences:
- it makes `DDL` / `DML` / `QUERY` auditing effectively cluster-wide
- it does not change the existing global handling of `AUTH`, `ADMIN`, and `DCL`
- statements that naturally have no keyspace name continue to depend on their category-specific behavior
No extra schema or metadata scan is required: the request already carries the keyspace information needed for the decision.
## Backwards compatibility
This design keeps existing behavior intact:
- existing clusters that do not set `audit_roles` continue to audit all roles
- existing clusters that leave `audit_keyspaces` empty continue to audit no keyspaces
- existing explicit keyspace/table lists keep their current meaning
The feature is enabled only by a new explicit boolean, so existing `audit_keyspaces` values do not need to be reinterpreted.
The only newly-invalid combination is enabling `audit_all_keyspaces` while also listing explicit keyspaces.
## Operational considerations
### Performance and volume
`audit_all_keyspaces: true` can significantly increase audit volume, especially with `QUERY` and `DML`.
The intended mitigation is to combine it with:
- a narrow `audit_categories`
- a narrow `audit_roles`
That combination gives operators a simple and cheap filter model:
- first by category
- then by role
- then by keyspace/table scope
### Live updates
`audit_roles` should follow the same live-update behavior as the current audit filters.
Changing:
- `audit_roles`
- `audit_all_keyspaces`
- `audit_keyspaces`
- `audit_tables`
- `audit_categories`
should update the in-memory selectors on all shards without restarting the node.
### Prototype limitation
Because matching is done against the authenticated session role name, `audit_roles` cannot express "audit everyone who inherits role X".
Operators must list the concrete login roles they want to audit. This is a deliberate trade-off in the prototype to keep matching cheap
and avoid role graph lookups on every audited request.
Example: if `alice` inherits permissions from `admin_role`, configuring `audit_roles: "admin_role"` would not audit requests from
`alice`; to audit those requests, `alice` itself must be listed.
### Audit table schema
No schema change is needed. The audit table already includes `username`, which is sufficient for both storage and later analysis.
## Testing plan
The prototype should extend existing audit coverage rather than introduce a separate test framework.
### Parser / unit coverage
Add focused tests for:
- empty `audit_roles`
- specific `audit_roles`
- `audit_all_keyspaces: true`
- invalid mixed configuration: `audit_all_keyspaces: true` with non-empty `audit_keyspaces`
- empty or whitespace-only keyspace lists such as `",,,"` or `" "`, which should normalize to an empty configuration and therefore audit no keyspaces
- boolean config parsing for `audit_all_keyspaces`
### Behavioral coverage
Extend the existing audit tests in `test/cluster/dtest/audit_test.py` with scenarios such as:
1. `audit_all_keyspaces: true` audits statements in multiple keyspaces without listing them explicitly
2. `audit_roles: "alice"` logs requests from `alice` but not from `bob`
3. `audit_all_keyspaces: true` + `audit_roles: "alice"` only logs `alice`'s traffic cluster-wide
4. login auditing respects `audit_roles`
5. live-updating `audit_roles` changes behavior without restart
6. setting `audit_all_keyspaces: true` together with explicit `audit_keyspaces` is rejected with a clear error
## Future evolution
This prototype is deliberately small, but it fits a broader audit-spec design if we decide to revisit that later.
In a future CQL-driven design, these two additions map naturally to triggers such as:
- `TRIGGER KEYSPACE IN *`
- `TRIGGER ROLE IN (...)`
That means the prototype is not throwaway work: it improves the current operational model immediately while keeping a clean path
toward richer audit objects in the future.

View File

@@ -2236,7 +2236,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
return m.start();
}).get();
api::set_server_storage_service(ctx, ss, group0_client).get();
api::set_server_storage_service(ctx, ss, snapshot_ctl, group0_client).get();
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
api::unset_server_storage_service(ctx).get();
});

View File

@@ -371,7 +371,7 @@ public:
}
void on_preemptive_aborted() {
if (_state != reader_permit::state::waiting_for_admission && _state != reader_permit::state::waiting_for_memory) {
if (_state != reader_permit::state::waiting_for_admission) {
on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state));
}
@@ -1533,19 +1533,24 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
// + permit.timeout() < db::no_timeout -- to avoid preemptively aborting reads without timeout.
// Useful is tests when _preemptive_abort_factor is set to 1.0
// to avoid additional sleeps to wait for the read to be shed.
const auto time_budget = permit.timeout() - permit.created();
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
if (remaining_time > db::timeout_clock::duration::zero() &&
permit.timeout() < db::no_timeout &&
remaining_time <= _preemptive_abort_factor() * time_budget) {
permit.on_preemptive_aborted();
using ms = std::chrono::milliseconds;
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
_name,
std::chrono::duration_cast<ms>(time_budget - remaining_time),
std::chrono::duration_cast<ms>(time_budget),
_preemptive_abort_factor());
continue;
//
// Only apply to permits waiting for admission -- permits waiting for memory are already
// executing reads and should not be preemptively aborted.
if (permit.get_state() == reader_permit::state::waiting_for_admission) {
const auto time_budget = permit.timeout() - permit.created();
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
if (remaining_time > db::timeout_clock::duration::zero() &&
permit.timeout() < db::no_timeout &&
remaining_time <= _preemptive_abort_factor() * time_budget) {
permit.on_preemptive_aborted();
using ms = std::chrono::milliseconds;
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
_name,
std::chrono::duration_cast<ms>(time_budget - remaining_time),
std::chrono::duration_cast<ms>(time_budget),
_preemptive_abort_factor());
continue;
}
}
if (permit.get_state() == reader_permit::state::waiting_for_memory) {

View File

@@ -2362,6 +2362,15 @@ static future<> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
}
}
static future<repair_rows_on_wire> clone_gently(const repair_rows_on_wire& rows) {
repair_rows_on_wire cloned;
for (const auto& row : rows) {
cloned.push_back(row);
co_await seastar::coroutine::maybe_yield();
}
co_return cloned;
}
static future<> repair_put_row_diff_with_rpc_stream_process_op(
sharded<repair_service>& repair,
locator::host_id from,
@@ -2388,7 +2397,9 @@ static future<> repair_put_row_diff_with_rpc_stream_process_op(
co_await rm->put_row_diff_handler(std::move(*fp));
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
} else {
co_await rm->put_row_diff_handler(*fp);
// Gently clone to avoid copy stall on destination shard
repair_rows_on_wire local_rows = co_await clone_gently(*fp);
co_await seastar::when_all_succeed(rm->put_row_diff_handler(std::move(local_rows)), utils::clear_gently(fp));
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
}
});

View File

@@ -2794,12 +2794,18 @@ future<> storage_service::raft_decommission() {
}
}
future<> storage_service::decommission() {
return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) {
return seastar::async([&ss] {
future<> storage_service::decommission(sharded<db::snapshot_ctl>& snapshot_ctl) {
return run_with_api_lock(sstring("decommission"), [&] (storage_service& ss) {
return seastar::async([&] {
if (ss._operation_mode != mode::NORMAL) {
throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode));
}
snapshot_ctl.invoke_on_all([](auto& sctl) {
return sctl.disable_all_operations();
}).get();
slogger.info("DECOMMISSIONING: disabled backup and snapshots");
ss.raft_decommission().get();
ss.stop_transport().get();

View File

@@ -77,6 +77,7 @@ namespace db {
class system_distributed_keyspace;
class system_keyspace;
class batchlog_manager;
class snapshot_ctl;
namespace view {
class view_builder;
class view_building_worker;
@@ -666,7 +667,7 @@ private:
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const schema_ptr& schema, const replica::column_family& cf, const partition_key& pk) const;
public:
future<> decommission();
future<> decommission(sharded<db::snapshot_ctl>&);
private:
future<> unbootstrap();

View File

@@ -471,14 +471,17 @@ def test_streams_operations(test_table_s, dynamodbstreams, metrics):
# to update latencies for one kind of operation (#17616, and compare #9406),
# and to do that checking that ..._count increases for that op is enough.
@contextmanager
def check_sets_latency(metrics, operation_names):
def check_sets_latency_by_metric(metrics, operation_names, metric_name):
the_metrics = get_metrics(metrics)
saved_latency_count = { x: get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': x}, the_metrics) for x in operation_names }
saved_latency_count = { x: get_metric(metrics, f'{metric_name}_count', {'op': x}, the_metrics) for x in operation_names }
yield
the_metrics = get_metrics(metrics)
for op in operation_names:
# The total "count" on all shards should strictly increase
assert saved_latency_count[op] < get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': op}, the_metrics)
assert saved_latency_count[op] < get_metric(metrics, f'{metric_name}_count', {'op': op}, the_metrics)
def check_sets_latency(metrics, operation_names):
return check_sets_latency_by_metric(metrics, operation_names, 'scylla_alternator_op_latency')
# Test latency metrics for PutItem, GetItem, DeleteItem, UpdateItem.
# We can't check what exactly the latency is - just that it gets updated.
@@ -494,6 +497,18 @@ def test_item_latency(test_table_s, metrics):
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
def test_item_latency_per_table(test_table_s, metrics):
with check_sets_latency_by_metric(metrics, ['DeleteItem', 'GetItem', 'PutItem', 'UpdateItem', 'BatchWriteItem', 'BatchGetItem'], 'scylla_alternator_table_op_latency'):
p = random_string()
test_table_s.put_item(Item={'p': p})
test_table_s.get_item(Key={'p': p})
test_table_s.delete_item(Key={'p': p})
test_table_s.update_item(Key={'p': p})
test_table_s.meta.client.batch_write_item(RequestItems = {
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
# Test latency metrics for GetRecords. Other Streams-related operations -
# ListStreams, DescribeStream, and GetShardIterator, have an operation
# count (tested above) but do NOT currently have a latency histogram.

View File

@@ -2409,6 +2409,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort)
}
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abort_preemptively_aborted_permit) {
simple_schema s;
const auto schema = s.schema();
const std::string test_name = get_name();
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
const auto serialize_multiplier = 2;
// Ensure permits are shed immediately during admission.
@@ -2421,27 +2425,24 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abort_preemptively_ab
// Set a ridiculously long timeout to ensure permit will not be rejected due to timeout
auto timeout = db::timeout_clock::now() + 60min;
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, timeout, {}).get();
auto permit1 = semaphore.obtain_permit(schema, test_name, 1024, db::no_timeout, {}).get();
auto units1 = permit1.request_memory(2024).get();
auto permit2_units1 = permit2.request_memory(1024).get();
auto permit1_units = permit1.request_memory(8 * 1024).get();
reader_permit_opt permit2_holder;
auto permit2_fut = semaphore.with_permit(schema, test_name.c_str(), 1024, timeout, {}, permit2_holder, [] (reader_permit) {
BOOST_FAIL("unexpected call to with permit lambda");
return make_ready_future<>();
});
// permit1 is now the blessed one
// Triggers maybe_admit_waiters()
units1.reset_to_zero();
auto permit2_units2_fut = permit2.request_memory(1024);
BOOST_REQUIRE(!permit2_units2_fut.available());
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 1);
BOOST_REQUIRE(eventually_true([&] { return permit2_fut.failed(); }));
BOOST_REQUIRE_THROW(permit2_fut.get(), named_semaphore_aborted);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().total_reads_shed_due_to_overload, 1);
permit1_units.reset_to_zero();
const auto futures_failed = eventually_true([&] { return permit2_units2_fut.failed(); });
BOOST_CHECK(futures_failed);
BOOST_CHECK_EQUAL(semaphore.get_stats().total_reads_shed_due_to_overload, 1);
simple_schema ss;
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(ss.schema(), permit2));
BOOST_REQUIRE_THROW(permit2_units2_fut.get(), named_semaphore_aborted);
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(schema, *permit2_holder));
BOOST_CHECK(!irh);
}
/// Test that if no count resources are currently used, a single permit is always admitted regardless of available memory.
@@ -2555,4 +2556,43 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_release_base_resource
}
}
// Reproducer for https://scylladb.atlassian.net/browse/SCYLLADB-1016
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_preemptive_abort_requested_memory_leak) {
const ssize_t memory = 1024;
const uint32_t serialize_limit_multiplier = 2;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(),
2, // count
memory,
100, // max queue length
utils::updateable_value(serialize_limit_multiplier),
utils::updateable_value(std::numeric_limits<uint32_t>::max()), // kill limit multiplier
utils::updateable_value<uint32_t>(1), // cpu concurrency
utils::updateable_value<float>(1.0f)); // preemptive abort factor
auto stop_sem = deferred_stop(semaphore);
auto permit1 = semaphore.obtain_permit(nullptr, "permit1", memory/2, db::no_timeout, {}).get();
reader_permit_opt permit2 = semaphore.obtain_permit(nullptr, "permit2", memory/2, db::timeout_clock::now() + 60s, {}).get();
auto units1 = permit1.request_memory(memory * serialize_limit_multiplier).get();
auto mem_fut = permit2->request_memory(1024);
BOOST_REQUIRE(!mem_fut.available());
// Triggers maybe_admit_waiters()
units1.reset_to_zero();
// Consume mem_fut to properly account for the 1024 bytes consumed by
// on_granted_memory(). In debug mode, the .then() continuation that creates
// the resource_units may be deferred due to yielding, so we must .get() it
// to ensure the resource_units is created and can be properly destroyed.
{ auto u = mem_fut.get(); }
// on_granted_memory() consumes stale _requested_memory (1024) + 512,
// but resource_units only tracks 512 — the difference leaks.
{ auto u = permit2->request_memory(512).get(); }
// Shouldn't fail if SCYLLADB-1016 is fixed.
permit2 = {};
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -61,7 +61,7 @@ async def test_startup_no_auth_response(manager: ManagerClient, build_mode):
session = c.connect()
logging.info("Performing SELECT(*) FROM system.clients")
res = session.execute("SELECT COUNT(*) FROM system.clients WHERE connection_stage = 'AUTHENTICATING' ALLOW FILTERING;")
count = res[0][0]
count = res.one()[0]
logging.info(f"Observed {count} AUTHENTICATING connections...")
if count >= num_connections/2:
connections_observed = True

View File

@@ -11,7 +11,7 @@ import pytest
import time
import random
from test.pylib.manager_client import ManagerClient
from test.pylib.manager_client import ManagerClient, ServerInfo
from test.cluster.object_store.conftest import format_tuples
from test.cluster.util import wait_for_cql_and_get_hosts, get_replication, new_test_keyspace
from test.pylib.rest_client import read_barrier
@@ -19,6 +19,7 @@ from test.pylib.util import unique_name, wait_all
from cassandra.cluster import ConsistencyLevel
from collections import defaultdict
from test.pylib.util import wait_for
from test.pylib.rest_client import HTTPError
import statistics
logger = logging.getLogger(__name__)
@@ -174,9 +175,8 @@ async def test_backup_endpoint_config_is_live_updateable(manager: ManagerClient,
assert status is not None
assert status['state'] == 'done'
async def do_test_backup_abort(manager: ManagerClient, object_storage,
breakpoint_name, min_files, max_files = None):
async def do_test_backup_helper(manager: ManagerClient, object_storage,
breakpoint_name, handler, num_servers: int = 1):
'''helper for backup abort testing'''
objconf = object_storage.create_endpoint_conf()
@@ -186,10 +186,9 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
'task_ttl_in_seconds': 300
}
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
server = await manager.server_add(config=cfg, cmdline=cmd)
server = (await manager.servers_add(num_servers, config=cfg, cmdline=cmd))[0]
ks, cf = create_ks_and_cf(manager.get_cql())
snap_name, files = await take_snapshot_on_one_server(ks, server, manager, logger)
assert len(files) > 1
workdir = await manager.server_get_workdir(server.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
@@ -206,26 +205,35 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
print(f'Started task {tid}, aborting it early')
await log.wait_for(breakpoint_name + ': waiting', from_mark=mark)
await manager.api.abort_task(server.ip_addr, tid)
await manager.api.message_injection(server.ip_addr, breakpoint_name)
status = await manager.api.wait_task(server.ip_addr, tid)
print(f'Status: {status}')
assert (status is not None) and (status['state'] == 'failed')
assert "seastar::abort_requested_exception (abort requested)" in status['error']
await handler(server, prefix, files, tid)
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
uploaded_count = 0
for f in files:
in_backup = f'{prefix}/{f}' in objects
print(f'Check {f} is in backup: {in_backup}')
if in_backup:
uploaded_count += 1
# Note: since s3 client is abortable and run async, we might fail even the first file
# regardless of if we set the abort status before or after the upload is initiated.
# Parallelism is a pain.
assert min_files <= uploaded_count < len(files)
assert max_files is None or uploaded_count < max_files
async def do_test_backup_abort(manager: ManagerClient, object_storage,
breakpoint_name, min_files, max_files = None):
'''helper for backup abort testing'''
async def abort_and_check(server, prefix, files, tid):
assert len(files) > 1
await manager.api.abort_task(server.ip_addr, tid)
await manager.api.message_injection(server.ip_addr, breakpoint_name)
status = await manager.api.wait_task(server.ip_addr, tid)
print(f'Status: {status}')
assert (status is not None) and (status['state'] == 'failed')
assert "seastar::abort_requested_exception (abort requested)" in status['error']
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
uploaded_count = 0
for f in files:
in_backup = f'{prefix}/{f}' in objects
print(f'Check {f} is in backup: {in_backup}')
if in_backup:
uploaded_count += 1
# Note: since s3 client is abortable and run async, we might fail even the first file
# regardless of if we set the abort status before or after the upload is initiated.
# Parallelism is a pain.
assert min_files <= uploaded_count < len(files)
assert max_files is None or uploaded_count < max_files
await do_test_backup_helper(manager, object_storage, breakpoint_name, abort_and_check)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@@ -456,7 +464,8 @@ class topo:
self.racks = racks
self.dcs = dcs
async def create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, object_storage=None):
async def create_cluster(topology, manager, logger, object_storage=None):
rf_rack_valid_keyspaces = (topology.rf <= topology.racks)
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks, rf_rack_valid_keyspaces: {rf_rack_valid_keyspaces}')
cfg = {'task_ttl_in_seconds': 300, 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces}
@@ -679,19 +688,19 @@ class SSTablesOnObjectStorage:
@pytest.mark.asyncio
@pytest.mark.parametrize("topology_rf_validity", [
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
@pytest.mark.parametrize("topology", [
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
])
async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerClient, object_storage, topology_rf_validity):
async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerClient, object_storage, topology):
'''Check that restoring of a cluster with stream scopes works'''
await do_test_streaming_scopes(build_mode, manager, topology_rf_validity, SSTablesOnObjectStorage(object_storage))
await do_test_streaming_scopes(build_mode, manager, topology, SSTablesOnObjectStorage(object_storage))
async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity, sstables_storage):
async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topology, sstables_storage):
'''
This test creates a cluster specified by the topology parameter above,
configurable number of nodes, tacks, datacenters, and replication factor.
@@ -707,9 +716,7 @@ async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topo
This stage parses the logs and checks that the data was streamed to nodes within the configured scope.
'''
topology, rf_rack_valid_keyspaces = topology_rf_validity
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, sstables_storage.object_storage)
servers, host_ids = await create_cluster(topology, manager, logger, sstables_storage.object_storage)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
@@ -858,7 +865,8 @@ async def test_backup_broken_streaming(manager: ManagerClient, s3_storage):
res = cql.execute(f"SELECT COUNT(*) FROM {keyspace}.{table} BYPASS CACHE USING TIMEOUT 600s;")
assert res[0].count == expected_rows, f"number of rows after restore is incorrect: {res[0].count}"
row = res.one()
assert row.count == expected_rows, f"number of rows after restore is incorrect: {row.count}"
log = await manager.server_open_log(server.server_id)
await log.wait_for("fully contained SSTables to local node from object storage", timeout=10)
# just make sure we had partially contained sstables as well
@@ -878,7 +886,7 @@ async def test_restore_primary_replica_same_domain(manager: ManagerClient, objec
ks = 'ks'
cf = 'cf'
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
servers, host_ids = await create_cluster(topology, manager, logger, object_storage)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
@@ -940,7 +948,7 @@ async def test_restore_primary_replica_different_domain(manager: ManagerClient,
ks = 'ks'
cf = 'cf'
servers, host_ids = await create_cluster(topology, True if domain == 'rack' else False, manager, logger, object_storage)
servers, host_ids = await create_cluster(topology, manager, logger, object_storage)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
@@ -971,3 +979,39 @@ async def test_restore_primary_replica_different_domain(manager: ManagerClient,
streamed_to = set([ r[1].group(1) for r in res ])
logger.info(f'{s.ip_addr} {host_ids[s.server_id]} streamed to {streamed_to}')
assert len(streamed_to) == 2
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_decommision_waits_for_backup(manager: ManagerClient, object_storage):
'''check that backing up a snapshot for a keyspace blocks decommission'''
async def decommission_and_check(server: ServerInfo, prefix: str, files, tid):
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
async def finish_backup():
# wait for snapshot to stop on waiting for backup
await log.wait_for("Waiting for snapshot/backup tasks to finish", from_mark=mark)
mark2 = await log.mark()
# let the backup run and finish
await manager.api.message_injection(server.ip_addr, "backup_task_pre_upload")
status = await manager.api.wait_task(server.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
uploaded_count = 0
# all files should be uploaded. note: can be zero due to two nodes
for f in files:
in_backup = f'{prefix}/{f}' in objects
print(f'Check {f} is in backup: {in_backup}')
if in_backup:
uploaded_count += 1
assert uploaded_count == len(files)
# Now wait for decommission to finish
await log.wait_for("DECOMMISSIONING: disabled backup and snapshots", from_mark=mark2)
await asyncio.gather(manager.decommission_node(server.server_id), finish_backup())
await do_test_backup_helper(manager, object_storage, "backup_task_pre_upload", decommission_and_check, 2)

View File

@@ -44,7 +44,6 @@ async def test_gossiper_empty_self_id_on_shadow_round(manager: ManagerClient):
logging.info("Starting cluster normally")
node1 = await manager.server_add(cmdline=cmdline, start=False, config=cfg)
manager.server_add(cmdline=cmdline, start=False)
node1_log = await manager.server_open_log(node1.server_id)
node2 = await manager.server_add(cmdline=cmdline, start=False, seeds=[node1.ip_addr])
node2_log = await manager.server_open_log(node2.server_id)

View File

@@ -7,6 +7,7 @@ import logging
import pytest
import asyncio
from test.pylib.internal_types import ServerNum
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, InjectionHandler, read_barrier
from test.cluster.util import create_new_test_keyspace
@@ -19,6 +20,20 @@ def fixture_raft_op_timeout(build_mode):
return 10000 if build_mode == 'debug' else 1000
async def update_group0_raft_op_timeout(server_id: ServerNum, manager: ManagerClient, timeout: int) -> None:
logger.info(f"Updating group0_raft_op_timeout_in_ms on server {server_id} to {timeout}")
running_ids = [srv.server_id for srv in await manager.running_servers()]
if server_id in running_ids:
# If the node is alive, server_update_config only sends the SIGHUP signal to the Scylla process, so awaiting it
# doesn't guarantee that the new config file is active. Work around this by looking at the logs.
log_file = await manager.server_open_log(server_id)
mark = await log_file.mark()
await manager.server_update_config(server_id, 'group0_raft_op_timeout_in_ms', timeout)
await log_file.wait_for("completed re-reading configuration file", from_mark=mark, timeout=60)
else:
await manager.server_update_config(server_id, 'group0_raft_op_timeout_in_ms', timeout)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@pytest.mark.skip_mode(mode='debug', reason='aarch64/debug is unpredictably slow', platform_key='aarch64')
@@ -41,7 +56,6 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
config = {
'direct_failure_detector_ping_timeout_in_ms': 300,
'group0_raft_op_timeout_in_ms': raft_op_timeout,
'error_injections_at_startup': [
{
'name': 'raft-group-registry-fd-threshold-in-ms',
@@ -63,6 +77,10 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
manager.server_stop_gracefully(servers[3].server_id),
manager.server_stop_gracefully(servers[4].server_id))
# Do it here to prevent unexpected timeouts before quorum loss.
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, raft_op_timeout)
for srv in servers[:2]))
logger.info("starting a sixth node with no quorum")
await manager.server_add(expected_error="raft operation \\[read_barrier\\] timed out, there is no raft quorum",
timeout=60)
@@ -75,7 +93,6 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
@pytest.mark.skip_mode(mode='debug', reason='aarch64/debug is unpredictably slow', platform_key='aarch64')
async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_timeout: int) -> None:
config = {
'group0_raft_op_timeout_in_ms': raft_op_timeout,
'error_injections_at_startup': [
{
'name': 'raft-group-registry-fd-threshold-in-ms',
@@ -106,6 +123,9 @@ async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_time
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
manager.server_stop_gracefully(servers[2].server_id))
# Do it here to prevent unexpected timeouts before quorum loss.
await update_group0_raft_op_timeout(servers[0].server_id, manager, raft_op_timeout)
logger.info("release join-node-before-add-entry injection")
await injection_handler.message()
@@ -125,7 +145,6 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
logger.info("adding a fourth node")
servers += [await manager.server_add(config={
'group0_raft_op_timeout_in_ms': raft_op_timeout,
'error_injections_at_startup': [
{
'name': 'raft-group-registry-fd-threshold-in-ms',
@@ -152,6 +171,9 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
manager.server_stop_gracefully(servers[2].server_id))
# Do it here to prevent unexpected timeouts before quorum loss.
await update_group0_raft_op_timeout(servers[3].server_id, manager, raft_op_timeout)
logger.info("release join-node-response_handler-before-read-barrier injection")
injection_handler = InjectionHandler(manager.api,
'join-node-response_handler-before-read-barrier',
@@ -168,7 +190,6 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
async def test_cannot_run_operations(manager: ManagerClient, raft_op_timeout: int) -> None:
logger.info("starting a first node (the leader)")
servers = [await manager.server_add(config={
'group0_raft_op_timeout_in_ms': raft_op_timeout,
'error_injections_at_startup': [
{
'name': 'raft-group-registry-fd-threshold-in-ms',
@@ -188,6 +209,9 @@ async def test_cannot_run_operations(manager: ManagerClient, raft_op_timeout: in
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
manager.server_stop_gracefully(servers[2].server_id))
# Do it here to prevent unexpected timeouts before quorum loss.
await update_group0_raft_op_timeout(servers[0].server_id, manager, raft_op_timeout)
logger.info("attempting removenode for the second node")
await manager.remove_node(servers[0].server_id, servers[1].server_id,
expected_error="raft operation [read_barrier] timed out, there is no raft quorum",
@@ -231,9 +255,7 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
await asyncio.gather(*(manager.server_stop(srv.server_id) for srv in servers))
# This ensures the read barriers below fail quickly without group 0 quorum.
logger.info(f"Decreasing group0_raft_op_timeout_in_ms on {servers}")
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', raft_op_timeout)
for srv in servers))
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, raft_op_timeout) for srv in servers))
logger.info(f"Restarting {servers[:2]} with no group 0 quorum")
for idx, srv in enumerate(servers[:2]):
@@ -245,8 +267,7 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
# Increase the timeout back to 300s to ensure the new group 0 leader is elected before the first read barrier below
# times out.
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', 300000)
for srv in servers))
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, 300000) for srv in servers))
logger.info(f"Restarting {servers[2:]} with group 0 quorum")
for srv in servers[2:]:

View File

@@ -75,16 +75,16 @@ class SSTablesOnLocalStorage:
await asyncio.gather(*(self.refresh_one(manager, s, ks, cf, sstables, scope, primary_replica_only) for s, sstables in sstables_per_server.items()))
@pytest.mark.asyncio
@pytest.mark.parametrize("topology_rf_validity", [
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
@pytest.mark.parametrize("topology", [
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
])
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity):
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology):
'''Check that refreshing of a cluster with stream scopes works'''
await do_test_streaming_scopes(build_mode, manager, topology_rf_validity, SSTablesOnLocalStorage())
await do_test_streaming_scopes(build_mode, manager, topology, SSTablesOnLocalStorage())
async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
@@ -94,7 +94,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1)
servers, host_ids = await create_cluster(topology, True, manager, logger)
servers, host_ids = await create_cluster(topology, manager, logger)
cql = manager.get_cql()

View File

@@ -71,7 +71,7 @@ async def test_snapshot_on_all_nodes(manager: ManagerClient):
"""
topology = topo(rf = 3, nodes = 3, racks = 3, dcs = 1)
servers, _ = await create_cluster(topology, True, manager, logger)
servers, _ = await create_cluster(topology, manager, logger)
snapshot_name = unique_name('snap_')

View File

@@ -182,7 +182,7 @@ async def test_tombstone_gc_insert_flush(manager: ManagerClient):
async def test_tablet_manual_repair_all_tokens(manager: ManagerClient):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)
token = "all"
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
map1 = await load_tablet_repair_time(cql, hosts[0:1], table_id)
await guarantee_repair_time_next_second()

View File

@@ -1324,7 +1324,7 @@ async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, n
return servers
class TestContext:
class Context:
def __init__(self, ks: str, table: str, rf: int, initial_tablets: int, num_keys: int):
self.ks = ks
self.table = table
@@ -1347,7 +1347,7 @@ async def create_and_populate_table(manager: ManagerClient, rf: int = 3, initial
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': {initial_tablets}}}")
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int PRIMARY KEY, c int)")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.{table} (pk, c) VALUES ({k}, 1);") for k in range(num_keys)])
yield TestContext(ks, table, rf, initial_tablets, num_keys)
yield Context(ks, table, rf, initial_tablets, num_keys)
finally:
await cql.run_async(f"DROP KEYSPACE {ks}")

View File

@@ -53,6 +53,11 @@ filterwarnings =
ignore::DeprecationWarning:importlib._bootstrap
ignore::DeprecationWarning:botocore
ignore::DeprecationWarning:pytest_elk_reporter
ignore::sqlalchemy.exc.MovedIn20Warning:kmip.pie.sqltypes
ignore::cryptography.utils.CryptographyDeprecationWarning:kmip
ignore:TypeDecorator .* will not produce a cache key:sqlalchemy.exc.SAWarning:kmip
ignore:Exception in thread[\s\S]*kmip[\s\S]*shutdown:pytest.PytestUnhandledThreadExceptionWarning
tmp_path_retention_count = 1
tmp_path_retention_policy = failed