Compare commits
24 Commits
copilot/us
...
copilot/pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
173fb1e6d3 | ||
|
|
e252bb1550 | ||
|
|
5713b5efd1 | ||
|
|
979ec5ada8 | ||
|
|
67503a350b | ||
|
|
a90490c3cf | ||
|
|
6f957ea4e0 | ||
|
|
f6605f7b66 | ||
|
|
6cb263bab0 | ||
|
|
035aa90d4b | ||
|
|
40d180a7ef | ||
|
|
9de8d6798e | ||
|
|
a5df2e79a7 | ||
|
|
85d5073234 | ||
|
|
3e4e0c57b8 | ||
|
|
526e5986fe | ||
|
|
bc0952781a | ||
|
|
755d528135 | ||
|
|
d544d8602d | ||
|
|
313985fed7 | ||
|
|
4c4d043a3b | ||
|
|
3b9cd52a95 | ||
|
|
aca5284b13 | ||
|
|
29e0b4e08c |
35
.github/workflows/call_jira_sync.yml
vendored
35
.github/workflows/call_jira_sync.yml
vendored
@@ -1,8 +1,8 @@
|
||||
name: Sync Jira Based on PR Events
|
||||
name: Sync Jira Based on PR Events
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [opened, ready_for_review, review_requested, labeled, unlabeled, closed]
|
||||
types: [opened, edited, ready_for_review, review_requested, labeled, unlabeled, closed]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
@@ -10,32 +10,9 @@ permissions:
|
||||
issues: write
|
||||
|
||||
jobs:
|
||||
jira-sync-pr-opened:
|
||||
if: github.event.action == 'opened'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_opened.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-sync-in-review:
|
||||
if: github.event.action == 'ready_for_review' || github.event.action == 'review_requested'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_in_review.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-sync-add-label:
|
||||
if: github.event.action == 'labeled'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_add_label.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-status-remove-label:
|
||||
if: github.event.action == 'unlabeled'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_remove_label.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-status-pr-closed:
|
||||
if: github.event.action == 'closed'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_closed.yml@main
|
||||
jira-sync:
|
||||
uses: scylladb/github-automation/.github/workflows/main_pr_events_jira_sync.yml@main
|
||||
with:
|
||||
caller_action: ${{ github.event.action }}
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
@@ -3463,7 +3463,11 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
if (should_add_wcu) {
|
||||
rjson::add(ret, "ConsumedCapacity", std::move(consumed_capacity));
|
||||
}
|
||||
_stats.api_operations.batch_write_item_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
auto duration = std::chrono::steady_clock::now() - start_time;
|
||||
_stats.api_operations.batch_write_item_latency.mark(duration);
|
||||
for (const auto& w : per_table_wcu) {
|
||||
w.first->api_operations.batch_write_item_latency.mark(duration);
|
||||
}
|
||||
co_return rjson::print(std::move(ret));
|
||||
}
|
||||
|
||||
@@ -4974,7 +4978,12 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
if (!some_succeeded && eptr) {
|
||||
co_await coroutine::return_exception_ptr(std::move(eptr));
|
||||
}
|
||||
_stats.api_operations.batch_get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
auto duration = std::chrono::steady_clock::now() - start_time;
|
||||
_stats.api_operations.batch_get_item_latency.mark(duration);
|
||||
for (const table_requests& rs : requests) {
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
|
||||
per_table_stats->api_operations.batch_get_item_latency.mark(duration);
|
||||
}
|
||||
if (is_big(response)) {
|
||||
co_return make_streamed(std::move(response));
|
||||
} else {
|
||||
|
||||
@@ -122,9 +122,9 @@ future<> unset_thrift_controller(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_thrift_controller(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
|
||||
return ctx.http_server.set_routes([&ctx, &ss, &group0_client] (routes& r) {
|
||||
set_storage_service(ctx, r, ss, group0_client);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
|
||||
return ctx.http_server.set_routes([&ctx, &ss, &ssc, &group0_client] (routes& r) {
|
||||
set_storage_service(ctx, r, ss, ssc, group0_client);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -98,7 +98,7 @@ future<> set_server_config(http_context& ctx, db::config& cfg);
|
||||
future<> unset_server_config(http_context& ctx);
|
||||
future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snitch);
|
||||
future<> unset_server_snitch(http_context& ctx);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
|
||||
future<> unset_server_storage_service(http_context& ctx);
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
|
||||
future<> unset_server_client_routes(http_context& ctx);
|
||||
|
||||
@@ -835,9 +835,9 @@ rest_force_keyspace_flush(http_context& ctx, std::unique_ptr<http::request> req)
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_decommission(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
rest_decommission(sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, std::unique_ptr<http::request> req) {
|
||||
apilog.info("decommission");
|
||||
return ss.local().decommission().then([] {
|
||||
return ss.local().decommission(ssc).then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
}
|
||||
@@ -1782,7 +1782,7 @@ rest_bind(FuncType func, BindArgs&... args) {
|
||||
return std::bind_front(func, std::ref(args)...);
|
||||
}
|
||||
|
||||
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
|
||||
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
|
||||
ss::get_token_endpoint.set(r, rest_bind(rest_get_token_endpoint, ctx, ss));
|
||||
ss::toppartitions_generic.set(r, rest_bind(rest_toppartitions_generic, ctx));
|
||||
ss::get_release_version.set(r, rest_bind(rest_get_release_version, ss));
|
||||
@@ -1799,7 +1799,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss));
|
||||
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
|
||||
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
|
||||
ss::decommission.set(r, rest_bind(rest_decommission, ss));
|
||||
ss::decommission.set(r, rest_bind(rest_decommission, ss, ssc));
|
||||
ss::move.set(r, rest_bind(rest_move, ss));
|
||||
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
|
||||
ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss));
|
||||
@@ -2141,6 +2141,7 @@ void unset_snapshot(http_context& ctx, routes& r) {
|
||||
ss::start_backup.unset(r);
|
||||
cf::get_true_snapshots_size.unset(r);
|
||||
cf::get_all_true_snapshots_size.unset(r);
|
||||
ss::decommission.unset(r);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ struct scrub_info {
|
||||
|
||||
scrub_info parse_scrub_options(const http_context& ctx, std::unique_ptr<http::request> req);
|
||||
|
||||
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, service::raft_group0_client&);
|
||||
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
|
||||
void unset_storage_service(http_context& ctx, httpd::routes& r);
|
||||
void set_sstables_loader(http_context& ctx, httpd::routes& r, sharded<sstables_loader>& sst_loader);
|
||||
void unset_sstables_loader(http_context& ctx, httpd::routes& r);
|
||||
|
||||
@@ -39,10 +39,19 @@ snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::stor
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::stop() {
|
||||
co_await _ops.close();
|
||||
co_await disable_all_operations();
|
||||
co_await _task_manager_module->stop();
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::disable_all_operations() {
|
||||
if (!_ops.is_closed()) {
|
||||
if (_ops.get_count()) {
|
||||
snap_log.info("Waiting for snapshot/backup tasks to finish");
|
||||
}
|
||||
co_await _ops.close();
|
||||
}
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter) {
|
||||
auto& ks = _db.local().find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name), filter = std::move(filter)] (auto& pair) {
|
||||
|
||||
@@ -120,6 +120,8 @@ public:
|
||||
|
||||
future<int64_t> true_snapshots_size();
|
||||
future<int64_t> true_snapshots_size(sstring ks, sstring cf);
|
||||
|
||||
future<> disable_all_operations();
|
||||
private:
|
||||
config _config;
|
||||
sharded<replica::database>& _db;
|
||||
|
||||
@@ -292,8 +292,8 @@ For example::
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
|
||||
or columns provided in a definition of the index.
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
|
||||
See :ref:`WHERE <where-clause>`.
|
||||
|
||||
For example::
|
||||
|
||||
@@ -301,10 +301,6 @@ For example::
|
||||
WHERE user_id = 'user123'
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
The supported operations are equal relations (``=`` and ``IN``) with restrictions as in regular ``WHERE`` clauses. See :ref:`WHERE <where-clause>`.
|
||||
|
||||
Other filtering scenarios are currently not supported.
|
||||
|
||||
.. note::
|
||||
|
||||
Vector indexes are supported in ScyllaDB Cloud only in clusters that have the Vector Search feature enabled.
|
||||
|
||||
@@ -1,111 +1,347 @@
|
||||
# Introduction
|
||||
# Prototype design: auditing all keyspaces and per-role auditing
|
||||
|
||||
Similar to the approach described in CASSANDRA-12151, we add the
|
||||
concept of an audit specification. An audit has a target (syslog or a
|
||||
table) and a set of events/actions that it wants recorded. We
|
||||
introduce new CQL syntax for Scylla users to describe and manipulate
|
||||
audit specifications.
|
||||
## Summary
|
||||
|
||||
Prior art:
|
||||
- Microsoft SQL Server [audit
|
||||
description](https://docs.microsoft.com/en-us/sql/relational-databases/security/auditing/sql-server-audit-database-engine?view=sql-server-ver15)
|
||||
- pgAudit [docs](https://github.com/pgaudit/pgaudit/blob/master/README.md)
|
||||
- MySQL audit_log docs in
|
||||
[MySQL](https://dev.mysql.com/doc/refman/8.0/en/audit-log.html) and
|
||||
[Azure](https://docs.microsoft.com/en-us/azure/mysql/concepts-audit-logs)
|
||||
- DynamoDB can [use CloudTrail](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/logging-using-cloudtrail.html) to log all events
|
||||
Extend the existing `scylla.yaml`-driven audit subsystem with two focused capabilities:
|
||||
|
||||
# CQL extensions
|
||||
1. allow auditing **all keyspaces** without enumerating them one by one
|
||||
2. allow auditing only a configured set of **roles**
|
||||
|
||||
## Create an audit
|
||||
The prototype should stay close to the current implementation in `audit/`:
|
||||
|
||||
```cql
|
||||
CREATE AUDIT [IF NOT EXISTS] audit-name WITH TARGET { SYSLOG | table-name }
|
||||
[ AND TRIGGER KEYSPACE IN (ks1, ks2, ks3) ]
|
||||
[ AND TRIGGER TABLE IN (tbl1, tbl2, tbl3) ]
|
||||
[ AND TRIGGER ROLE IN (usr1, usr2, usr3) ]
|
||||
[ AND TRIGGER CATEGORY IN (cat1, cat2, cat3) ]
|
||||
;
|
||||
- keep the existing backends (`table`, `syslog`, or both)
|
||||
- keep the existing category / keyspace / table filters
|
||||
- preserve live updates for audit configuration
|
||||
- avoid any schema change to `audit.audit_log`
|
||||
|
||||
This is intentionally a small extension of the current auditing model, not a redesign around new CQL statements such as `CREATE AUDIT`.
|
||||
|
||||
## Motivation
|
||||
|
||||
Today Scylla exposes three main audit selectors:
|
||||
|
||||
- `audit_categories`
|
||||
- `audit_tables`
|
||||
- `audit_keyspaces`
|
||||
|
||||
This leaves two operational gaps:
|
||||
|
||||
1. **Auditing all keyspaces is cumbersome.**
|
||||
Large installations may create keyspaces dynamically, or manage many tenant keyspaces. Requiring operators to keep
|
||||
`audit_keyspaces` synchronized with the full keyspace list is error-prone and defeats the point of cluster-wide auditing.
|
||||
2. **Auditing is all-or-nothing with respect to users.**
|
||||
Once a category/keyspace/table combination matches, any authenticated user generating that traffic is audited.
|
||||
Operators want to narrow the scope to specific tenants, service accounts, or privileged roles.
|
||||
|
||||
These two additions also work well together: "audit all keyspaces, but only for selected roles" is a practical way to reduce
|
||||
both audit volume and performance impact.
|
||||
|
||||
## Goals
|
||||
|
||||
- Add a way to express "all keyspaces" in the current configuration model.
|
||||
- Add a new role filter that limits auditing to selected roles.
|
||||
- Preserve backwards compatibility for existing configurations.
|
||||
- Keep the evaluation cheap on the request path.
|
||||
- Support live configuration updates, consistent with the existing audit options.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- Introducing `CREATE AUDIT`, `ALTER AUDIT`, or other new CQL syntax.
|
||||
- Adding per-role audit destinations.
|
||||
- Adding different categories per role.
|
||||
- Expanding role matching through the full granted-role graph in the prototype.
|
||||
- Changing the on-disk audit table schema.
|
||||
|
||||
## Current behavior
|
||||
|
||||
At the moment, audit logging is controlled by:
|
||||
|
||||
- `audit`
|
||||
- `audit_categories`
|
||||
- `audit_tables`
|
||||
- `audit_keyspaces`
|
||||
|
||||
The current decision rule in `audit::should_log()` is effectively:
|
||||
|
||||
```text
|
||||
category matches
|
||||
&& (
|
||||
keyspace is listed in audit_keyspaces
|
||||
|| table is listed in audit_tables
|
||||
|| category in {AUTH, ADMIN, DCL}
|
||||
)
|
||||
```
|
||||
|
||||
From this point on, every database event that matches all present
|
||||
triggers will be recorded in the target. When the target is a table,
|
||||
it behaves like the [current
|
||||
design](https://docs.scylladb.com/operating-scylla/security/auditing/#table-storage).
|
||||
Observations:
|
||||
|
||||
The audit name must be different from all other audits, unless IF NOT
|
||||
EXISTS precedes it, in which case the existing audit must be identical
|
||||
to the new definition. Case sensitivity and length limit are the same
|
||||
as for table names.
|
||||
- `AUTH`, `ADMIN`, and `DCL` are already global once their category is enabled.
|
||||
- `DDL`, `DML`, and `QUERY` need a matching keyspace or table.
|
||||
- An empty `audit_keyspaces` means "audit no keyspaces", not "audit every keyspace".
|
||||
- There is no role-based filter; the authenticated user is recorded in the log but is not part of the decision.
|
||||
- The exact implementation to preserve is in `audit/audit.cc` (`should_log()`, `inspect()`, and `inspect_login()`).
|
||||
|
||||
A trigger kind (ie, `KEYSPACE`, `TABLE`, `ROLE`, or `CATEGORY`) can be
|
||||
specified at most once.
|
||||
## Proposed configuration
|
||||
|
||||
## Show an audit
|
||||
### 1. Add `audit_all_keyspaces`
|
||||
|
||||
```cql
|
||||
DESCRIBE AUDIT [audit-name ...];
|
||||
Introduce a new live-update boolean option:
|
||||
|
||||
Examples:
|
||||
|
||||
```yaml
|
||||
# Audit all keyspaces for matching categories
|
||||
audit_all_keyspaces: true
|
||||
|
||||
# Audit all keyspaces for selected roles
|
||||
audit_all_keyspaces: true
|
||||
audit_roles: "alice,bob"
|
||||
```
|
||||
|
||||
Prints definitions of all audits named herein. If no names are
|
||||
provided, prints all audits.
|
||||
Semantics:
|
||||
|
||||
## Delete an audit
|
||||
- `audit_all_keyspaces: false` keeps the existing behavior.
|
||||
- `audit_all_keyspaces: true` makes every keyspace match.
|
||||
- `audit_keyspaces` keeps its existing meaning: an explicit list of keyspaces, or no keyspace-wide auditing when left empty.
|
||||
- `audit_all_keyspaces: true` and a non-empty `audit_keyspaces` must be rejected as invalid configuration,
|
||||
because the two options express overlapping scope in different ways.
|
||||
- A dedicated boolean is preferable to overloading `audit_keyspaces`, because it avoids changing the meaning of existing configurations.
|
||||
- This also keeps the behavior aligned with today's `audit_tables` handling, where leaving `audit_tables` empty does not introduce a new wildcard syntax.
|
||||
|
||||
```cql
|
||||
DROP AUDIT audit-name;
|
||||
### 2. Add `audit_roles`
|
||||
|
||||
Introduce a new live-update configuration option:
|
||||
|
||||
```yaml
|
||||
audit_roles: "alice,bob,service_api"
|
||||
```
|
||||
|
||||
Stops logging events specified by this audit. Doesn't impact the
|
||||
already logged events. If the target is a table, it remains as it is.
|
||||
Semantics:
|
||||
|
||||
## Alter an audit
|
||||
- empty `audit_roles` means **no role filtering**, preserving today's behavior
|
||||
- non-empty `audit_roles` means audit only requests whose effective logged username matches one of the configured roles
|
||||
- matching is byte-for-byte exact, using the same role name that is already written to the audit record's `username` column / syslog field
|
||||
- the prototype should compare against the post-authentication role name from the session and audit log,
|
||||
with no additional case folding or role-graph expansion
|
||||
|
||||
```cql
|
||||
ALTER AUDIT audit-name WITH {same syntax as CREATE}
|
||||
Examples:
|
||||
|
||||
```yaml
|
||||
# Audit all roles in a single keyspace (current behavior, made explicit)
|
||||
audit_keyspaces: "ks1"
|
||||
audit_roles: ""
|
||||
|
||||
# Audit two roles across all keyspaces
|
||||
audit_all_keyspaces: true
|
||||
audit_roles: "alice,bob"
|
||||
|
||||
# Audit a service role, but only for selected tables
|
||||
audit_tables: "ks1.orders,ks1.payments"
|
||||
audit_roles: "billing_service"
|
||||
```
|
||||
|
||||
Any trigger provided will be updated (or newly created, if previously
|
||||
absent). To drop a trigger, use `IN *`.
|
||||
## Decision rule after the change
|
||||
|
||||
## Permissions
|
||||
After the prototype, the rule becomes:
|
||||
|
||||
Only superusers can modify audits or turn them on and off.
|
||||
```text
|
||||
category matches
|
||||
&& role matches
|
||||
&& (
|
||||
category in {AUTH, ADMIN, DCL}
|
||||
|| audit_all_keyspaces
|
||||
|| keyspace is listed in audit_keyspaces
|
||||
|| table is listed in audit_tables
|
||||
)
|
||||
```
|
||||
|
||||
Only superusers can read tables that are audit targets; no user can
|
||||
modify them. Only superusers can drop tables that are audit targets,
|
||||
after the audit itself is dropped. If a superuser doesn't drop a
|
||||
target table, it remains in existence indefinitely.
|
||||
Where:
|
||||
|
||||
# Implementation
|
||||
- `role matches` is always true when `audit_roles` is empty
|
||||
- `audit_all_keyspaces` is true when the new boolean option is enabled
|
||||
|
||||
## Efficient trigger evaluation
|
||||
For login auditing, the rule is simply:
|
||||
|
||||
```text
|
||||
AUTH category enabled && role matches(login username)
|
||||
```
|
||||
|
||||
## Implementation details
|
||||
|
||||
### Configuration parsing
|
||||
|
||||
Add a new config entry:
|
||||
|
||||
- `db::config::audit_all_keyspaces`
|
||||
- `db::config::audit_roles`
|
||||
|
||||
It should mirror the existing audit selectors:
|
||||
|
||||
- `audit_all_keyspaces`: type `named_value<bool>`, liveness `LiveUpdate`, default `false`
|
||||
- `audit_roles`: type `named_value<sstring>`, liveness `LiveUpdate`, default empty string
|
||||
|
||||
Parsing changes:
|
||||
|
||||
- keep `parse_audit_tables()` as-is
|
||||
- keep `parse_audit_keyspaces()` semantics as-is
|
||||
- add `parse_audit_roles()` that returns a set of role names
|
||||
- normalize empty or whitespace-only keyspace lists to an empty configuration rather than treating them as real keyspace names
|
||||
- add cross-field validation so `audit_all_keyspaces: true` cannot be combined with a non-empty
|
||||
`audit_keyspaces`, both at startup and during live updates
|
||||
|
||||
To avoid re-parsing on every request, the `audit::audit` service should store:
|
||||
|
||||
```c++
|
||||
namespace audit {
|
||||
|
||||
/// Stores triggers from an AUDIT statement.
|
||||
class triggers {
|
||||
// Use trie structures for speedy string lookup.
|
||||
optional<trie> _ks_trigger, _tbl_trigger, _usr_trigger;
|
||||
|
||||
// A logical-AND filter.
|
||||
optional<unsigned> _cat_trigger;
|
||||
|
||||
public:
|
||||
/// True iff every non-null trigger matches the corresponding ainf element.
|
||||
bool should_audit(const audit_info& ainf);
|
||||
};
|
||||
|
||||
} // namespace audit
|
||||
bool _audit_all_keyspaces;
|
||||
std::set<sstring> _audited_keyspaces;
|
||||
std::set<sstring> _audited_roles;
|
||||
```
|
||||
|
||||
To prevent modification of target tables, `audit::inspect()` will
|
||||
check the statement and throw if it is disallowed, similar to what
|
||||
`check_access()` currently does.
|
||||
Using a dedicated boolean keeps the hot-path check straightforward and avoids reinterpreting the existing
|
||||
`_audited_keyspaces` selector.
|
||||
|
||||
## Persisting audit definitions
|
||||
Using `std::set` for the explicit selectors keeps the prototype aligned with the current implementation and minimizes code churn.
|
||||
If profiling later shows lookup cost matters here, the container choice can be revisited independently of the feature semantics.
|
||||
|
||||
Obviously, an audit definition must survive a server restart and stay
|
||||
consistent among all nodes in a cluster. We'll accomplish both by
|
||||
storing audits in a system table.
|
||||
### Audit object changes
|
||||
|
||||
The current `audit_info` already carries:
|
||||
|
||||
- category
|
||||
- keyspace
|
||||
- table
|
||||
- query text
|
||||
|
||||
The username is available separately from `service::query_state` and is already passed to storage helpers when an entry is written.
|
||||
For the prototype there is no need to duplicate the username into `audit_info`.
|
||||
|
||||
Instead:
|
||||
|
||||
- change `should_log()` to take the effective username as an additional input
|
||||
- change `should_log_login()` to check the username against `audit_roles`
|
||||
- keep the storage helpers unchanged, because they already persist the username
|
||||
- update the existing internal call sites in `inspect()` and `inspect_login()` to pass the username through
|
||||
|
||||
One possible interface shape is:
|
||||
|
||||
```c++
|
||||
bool should_log(std::string_view username, const audit_info* info) const;
|
||||
bool should_log_login(std::string_view username) const;
|
||||
```
|
||||
|
||||
### Role semantics
|
||||
|
||||
For the prototype, "role" means the role name already associated with the current client session:
|
||||
|
||||
- successful authenticated sessions use the session's user name
|
||||
- failed login events use the login name from the authentication attempt
|
||||
- failed login events are still subject to `audit_roles`, matched against the attempted login name
|
||||
|
||||
This keeps the feature easy to explain and aligns the filter with what users already see in audit output.
|
||||
|
||||
The prototype should **not** try to expand inherited roles. If a user logs in as `alice` and inherits permissions from another role,
|
||||
the audit filter still matches `alice`. This keeps the behavior deterministic and avoids expensive role graph lookups on the request path.
|
||||
|
||||
### Keyspace semantics
|
||||
|
||||
`audit_all_keyspaces: true` should affect any statement whose `audit_info` carries a keyspace name.
|
||||
|
||||
Important consequences:
|
||||
|
||||
- it makes `DDL` / `DML` / `QUERY` auditing effectively cluster-wide
|
||||
- it does not change the existing global handling of `AUTH`, `ADMIN`, and `DCL`
|
||||
- statements that naturally have no keyspace name continue to depend on their category-specific behavior
|
||||
|
||||
No extra schema or metadata scan is required: the request already carries the keyspace information needed for the decision.
|
||||
|
||||
## Backwards compatibility
|
||||
|
||||
This design keeps existing behavior intact:
|
||||
|
||||
- existing clusters that do not set `audit_roles` continue to audit all roles
|
||||
- existing clusters that leave `audit_keyspaces` empty continue to audit no keyspaces
|
||||
- existing explicit keyspace/table lists keep their current meaning
|
||||
|
||||
The feature is enabled only by a new explicit boolean, so existing `audit_keyspaces` values do not need to be reinterpreted.
|
||||
The only newly-invalid combination is enabling `audit_all_keyspaces` while also listing explicit keyspaces.
|
||||
|
||||
## Operational considerations
|
||||
|
||||
### Performance and volume
|
||||
|
||||
`audit_all_keyspaces: true` can significantly increase audit volume, especially with `QUERY` and `DML`.
|
||||
|
||||
The intended mitigation is to combine it with:
|
||||
|
||||
- a narrow `audit_categories`
|
||||
- a narrow `audit_roles`
|
||||
|
||||
That combination gives operators a simple and cheap filter model:
|
||||
|
||||
- first by category
|
||||
- then by role
|
||||
- then by keyspace/table scope
|
||||
|
||||
### Live updates
|
||||
|
||||
`audit_roles` should follow the same live-update behavior as the current audit filters.
|
||||
|
||||
Changing:
|
||||
|
||||
- `audit_roles`
|
||||
- `audit_all_keyspaces`
|
||||
- `audit_keyspaces`
|
||||
- `audit_tables`
|
||||
- `audit_categories`
|
||||
|
||||
should update the in-memory selectors on all shards without restarting the node.
|
||||
|
||||
### Prototype limitation
|
||||
|
||||
Because matching is done against the authenticated session role name, `audit_roles` cannot express "audit everyone who inherits role X".
|
||||
Operators must list the concrete login roles they want to audit. This is a deliberate trade-off in the prototype to keep matching cheap
|
||||
and avoid role graph lookups on every audited request.
|
||||
|
||||
Example: if `alice` inherits permissions from `admin_role`, configuring `audit_roles: "admin_role"` would not audit requests from
|
||||
`alice`; to audit those requests, `alice` itself must be listed.
|
||||
|
||||
### Audit table schema
|
||||
|
||||
No schema change is needed. The audit table already includes `username`, which is sufficient for both storage and later analysis.
|
||||
|
||||
## Testing plan
|
||||
|
||||
The prototype should extend existing audit coverage rather than introduce a separate test framework.
|
||||
|
||||
### Parser / unit coverage
|
||||
|
||||
Add focused tests for:
|
||||
|
||||
- empty `audit_roles`
|
||||
- specific `audit_roles`
|
||||
- `audit_all_keyspaces: true`
|
||||
- invalid mixed configuration: `audit_all_keyspaces: true` with non-empty `audit_keyspaces`
|
||||
- empty or whitespace-only keyspace lists such as `",,,"` or `" "`, which should normalize to an empty configuration and therefore audit no keyspaces
|
||||
- boolean config parsing for `audit_all_keyspaces`
|
||||
|
||||
### Behavioral coverage
|
||||
|
||||
Extend the existing audit tests in `test/cluster/dtest/audit_test.py` with scenarios such as:
|
||||
|
||||
1. `audit_all_keyspaces: true` audits statements in multiple keyspaces without listing them explicitly
|
||||
2. `audit_roles: "alice"` logs requests from `alice` but not from `bob`
|
||||
3. `audit_all_keyspaces: true` + `audit_roles: "alice"` only logs `alice`'s traffic cluster-wide
|
||||
4. login auditing respects `audit_roles`
|
||||
5. live-updating `audit_roles` changes behavior without restart
|
||||
6. setting `audit_all_keyspaces: true` together with explicit `audit_keyspaces` is rejected with a clear error
|
||||
|
||||
## Future evolution
|
||||
|
||||
This prototype is deliberately small, but it fits a broader audit-spec design if we decide to revisit that later.
|
||||
|
||||
In a future CQL-driven design, these two additions map naturally to triggers such as:
|
||||
|
||||
- `TRIGGER KEYSPACE IN *`
|
||||
- `TRIGGER ROLE IN (...)`
|
||||
|
||||
That means the prototype is not throwaway work: it improves the current operational model immediately while keeping a clean path
|
||||
toward richer audit objects in the future.
|
||||
|
||||
2
main.cc
2
main.cc
@@ -2236,7 +2236,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
return m.start();
|
||||
}).get();
|
||||
|
||||
api::set_server_storage_service(ctx, ss, group0_client).get();
|
||||
api::set_server_storage_service(ctx, ss, snapshot_ctl, group0_client).get();
|
||||
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
|
||||
api::unset_server_storage_service(ctx).get();
|
||||
});
|
||||
|
||||
@@ -371,7 +371,7 @@ public:
|
||||
}
|
||||
|
||||
void on_preemptive_aborted() {
|
||||
if (_state != reader_permit::state::waiting_for_admission && _state != reader_permit::state::waiting_for_memory) {
|
||||
if (_state != reader_permit::state::waiting_for_admission) {
|
||||
on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state));
|
||||
}
|
||||
|
||||
@@ -1533,19 +1533,24 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
// + permit.timeout() < db::no_timeout -- to avoid preemptively aborting reads without timeout.
|
||||
// Useful is tests when _preemptive_abort_factor is set to 1.0
|
||||
// to avoid additional sleeps to wait for the read to be shed.
|
||||
const auto time_budget = permit.timeout() - permit.created();
|
||||
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
|
||||
if (remaining_time > db::timeout_clock::duration::zero() &&
|
||||
permit.timeout() < db::no_timeout &&
|
||||
remaining_time <= _preemptive_abort_factor() * time_budget) {
|
||||
permit.on_preemptive_aborted();
|
||||
using ms = std::chrono::milliseconds;
|
||||
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
|
||||
_name,
|
||||
std::chrono::duration_cast<ms>(time_budget - remaining_time),
|
||||
std::chrono::duration_cast<ms>(time_budget),
|
||||
_preemptive_abort_factor());
|
||||
continue;
|
||||
//
|
||||
// Only apply to permits waiting for admission -- permits waiting for memory are already
|
||||
// executing reads and should not be preemptively aborted.
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_admission) {
|
||||
const auto time_budget = permit.timeout() - permit.created();
|
||||
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
|
||||
if (remaining_time > db::timeout_clock::duration::zero() &&
|
||||
permit.timeout() < db::no_timeout &&
|
||||
remaining_time <= _preemptive_abort_factor() * time_budget) {
|
||||
permit.on_preemptive_aborted();
|
||||
using ms = std::chrono::milliseconds;
|
||||
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
|
||||
_name,
|
||||
std::chrono::duration_cast<ms>(time_budget - remaining_time),
|
||||
std::chrono::duration_cast<ms>(time_budget),
|
||||
_preemptive_abort_factor());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
|
||||
@@ -2362,6 +2362,15 @@ static future<> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
|
||||
}
|
||||
}
|
||||
|
||||
static future<repair_rows_on_wire> clone_gently(const repair_rows_on_wire& rows) {
|
||||
repair_rows_on_wire cloned;
|
||||
for (const auto& row : rows) {
|
||||
cloned.push_back(row);
|
||||
co_await seastar::coroutine::maybe_yield();
|
||||
}
|
||||
co_return cloned;
|
||||
}
|
||||
|
||||
static future<> repair_put_row_diff_with_rpc_stream_process_op(
|
||||
sharded<repair_service>& repair,
|
||||
locator::host_id from,
|
||||
@@ -2388,7 +2397,9 @@ static future<> repair_put_row_diff_with_rpc_stream_process_op(
|
||||
co_await rm->put_row_diff_handler(std::move(*fp));
|
||||
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
|
||||
} else {
|
||||
co_await rm->put_row_diff_handler(*fp);
|
||||
// Gently clone to avoid copy stall on destination shard
|
||||
repair_rows_on_wire local_rows = co_await clone_gently(*fp);
|
||||
co_await seastar::when_all_succeed(rm->put_row_diff_handler(std::move(local_rows)), utils::clear_gently(fp));
|
||||
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -2794,12 +2794,18 @@ future<> storage_service::raft_decommission() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::decommission() {
|
||||
return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) {
|
||||
return seastar::async([&ss] {
|
||||
future<> storage_service::decommission(sharded<db::snapshot_ctl>& snapshot_ctl) {
|
||||
return run_with_api_lock(sstring("decommission"), [&] (storage_service& ss) {
|
||||
return seastar::async([&] {
|
||||
if (ss._operation_mode != mode::NORMAL) {
|
||||
throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode));
|
||||
}
|
||||
|
||||
snapshot_ctl.invoke_on_all([](auto& sctl) {
|
||||
return sctl.disable_all_operations();
|
||||
}).get();
|
||||
slogger.info("DECOMMISSIONING: disabled backup and snapshots");
|
||||
|
||||
ss.raft_decommission().get();
|
||||
|
||||
ss.stop_transport().get();
|
||||
|
||||
@@ -77,6 +77,7 @@ namespace db {
|
||||
class system_distributed_keyspace;
|
||||
class system_keyspace;
|
||||
class batchlog_manager;
|
||||
class snapshot_ctl;
|
||||
namespace view {
|
||||
class view_builder;
|
||||
class view_building_worker;
|
||||
@@ -666,7 +667,7 @@ private:
|
||||
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const schema_ptr& schema, const replica::column_family& cf, const partition_key& pk) const;
|
||||
|
||||
public:
|
||||
future<> decommission();
|
||||
future<> decommission(sharded<db::snapshot_ctl>&);
|
||||
|
||||
private:
|
||||
future<> unbootstrap();
|
||||
|
||||
@@ -471,14 +471,17 @@ def test_streams_operations(test_table_s, dynamodbstreams, metrics):
|
||||
# to update latencies for one kind of operation (#17616, and compare #9406),
|
||||
# and to do that checking that ..._count increases for that op is enough.
|
||||
@contextmanager
|
||||
def check_sets_latency(metrics, operation_names):
|
||||
def check_sets_latency_by_metric(metrics, operation_names, metric_name):
|
||||
the_metrics = get_metrics(metrics)
|
||||
saved_latency_count = { x: get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': x}, the_metrics) for x in operation_names }
|
||||
saved_latency_count = { x: get_metric(metrics, f'{metric_name}_count', {'op': x}, the_metrics) for x in operation_names }
|
||||
yield
|
||||
the_metrics = get_metrics(metrics)
|
||||
for op in operation_names:
|
||||
# The total "count" on all shards should strictly increase
|
||||
assert saved_latency_count[op] < get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': op}, the_metrics)
|
||||
assert saved_latency_count[op] < get_metric(metrics, f'{metric_name}_count', {'op': op}, the_metrics)
|
||||
|
||||
def check_sets_latency(metrics, operation_names):
|
||||
return check_sets_latency_by_metric(metrics, operation_names, 'scylla_alternator_op_latency')
|
||||
|
||||
# Test latency metrics for PutItem, GetItem, DeleteItem, UpdateItem.
|
||||
# We can't check what exactly the latency is - just that it gets updated.
|
||||
@@ -494,6 +497,18 @@ def test_item_latency(test_table_s, metrics):
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
def test_item_latency_per_table(test_table_s, metrics):
|
||||
with check_sets_latency_by_metric(metrics, ['DeleteItem', 'GetItem', 'PutItem', 'UpdateItem', 'BatchWriteItem', 'BatchGetItem'], 'scylla_alternator_table_op_latency'):
|
||||
p = random_string()
|
||||
test_table_s.put_item(Item={'p': p})
|
||||
test_table_s.get_item(Key={'p': p})
|
||||
test_table_s.delete_item(Key={'p': p})
|
||||
test_table_s.update_item(Key={'p': p})
|
||||
test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
# Test latency metrics for GetRecords. Other Streams-related operations -
|
||||
# ListStreams, DescribeStream, and GetShardIterator, have an operation
|
||||
# count (tested above) but do NOT currently have a latency histogram.
|
||||
|
||||
@@ -2409,6 +2409,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort)
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abort_preemptively_aborted_permit) {
|
||||
simple_schema s;
|
||||
const auto schema = s.schema();
|
||||
const std::string test_name = get_name();
|
||||
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
const auto serialize_multiplier = 2;
|
||||
// Ensure permits are shed immediately during admission.
|
||||
@@ -2421,27 +2425,24 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abort_preemptively_ab
|
||||
|
||||
// Set a ridiculously long timeout to ensure permit will not be rejected due to timeout
|
||||
auto timeout = db::timeout_clock::now() + 60min;
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, timeout, {}).get();
|
||||
auto permit1 = semaphore.obtain_permit(schema, test_name, 1024, db::no_timeout, {}).get();
|
||||
auto units1 = permit1.request_memory(2024).get();
|
||||
|
||||
auto permit2_units1 = permit2.request_memory(1024).get();
|
||||
auto permit1_units = permit1.request_memory(8 * 1024).get();
|
||||
reader_permit_opt permit2_holder;
|
||||
auto permit2_fut = semaphore.with_permit(schema, test_name.c_str(), 1024, timeout, {}, permit2_holder, [] (reader_permit) {
|
||||
BOOST_FAIL("unexpected call to with permit lambda");
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
// permit1 is now the blessed one
|
||||
// Triggers maybe_admit_waiters()
|
||||
units1.reset_to_zero();
|
||||
|
||||
auto permit2_units2_fut = permit2.request_memory(1024);
|
||||
BOOST_REQUIRE(!permit2_units2_fut.available());
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 1);
|
||||
BOOST_REQUIRE(eventually_true([&] { return permit2_fut.failed(); }));
|
||||
BOOST_REQUIRE_THROW(permit2_fut.get(), named_semaphore_aborted);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().total_reads_shed_due_to_overload, 1);
|
||||
|
||||
permit1_units.reset_to_zero();
|
||||
|
||||
const auto futures_failed = eventually_true([&] { return permit2_units2_fut.failed(); });
|
||||
BOOST_CHECK(futures_failed);
|
||||
BOOST_CHECK_EQUAL(semaphore.get_stats().total_reads_shed_due_to_overload, 1);
|
||||
|
||||
simple_schema ss;
|
||||
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(ss.schema(), permit2));
|
||||
BOOST_REQUIRE_THROW(permit2_units2_fut.get(), named_semaphore_aborted);
|
||||
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(schema, *permit2_holder));
|
||||
BOOST_CHECK(!irh);
|
||||
}
|
||||
|
||||
/// Test that if no count resources are currently used, a single permit is always admitted regardless of available memory.
|
||||
@@ -2555,4 +2556,43 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_release_base_resource
|
||||
}
|
||||
}
|
||||
|
||||
// Reproducer for https://scylladb.atlassian.net/browse/SCYLLADB-1016
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_preemptive_abort_requested_memory_leak) {
|
||||
const ssize_t memory = 1024;
|
||||
const uint32_t serialize_limit_multiplier = 2;
|
||||
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(),
|
||||
2, // count
|
||||
memory,
|
||||
100, // max queue length
|
||||
utils::updateable_value(serialize_limit_multiplier),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()), // kill limit multiplier
|
||||
utils::updateable_value<uint32_t>(1), // cpu concurrency
|
||||
utils::updateable_value<float>(1.0f)); // preemptive abort factor
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, "permit1", memory/2, db::no_timeout, {}).get();
|
||||
reader_permit_opt permit2 = semaphore.obtain_permit(nullptr, "permit2", memory/2, db::timeout_clock::now() + 60s, {}).get();
|
||||
|
||||
auto units1 = permit1.request_memory(memory * serialize_limit_multiplier).get();
|
||||
auto mem_fut = permit2->request_memory(1024);
|
||||
BOOST_REQUIRE(!mem_fut.available());
|
||||
|
||||
// Triggers maybe_admit_waiters()
|
||||
units1.reset_to_zero();
|
||||
|
||||
// Consume mem_fut to properly account for the 1024 bytes consumed by
|
||||
// on_granted_memory(). In debug mode, the .then() continuation that creates
|
||||
// the resource_units may be deferred due to yielding, so we must .get() it
|
||||
// to ensure the resource_units is created and can be properly destroyed.
|
||||
{ auto u = mem_fut.get(); }
|
||||
|
||||
// on_granted_memory() consumes stale _requested_memory (1024) + 512,
|
||||
// but resource_units only tracks 512 — the difference leaks.
|
||||
{ auto u = permit2->request_memory(512).get(); }
|
||||
|
||||
// Shouldn't fail if SCYLLADB-1016 is fixed.
|
||||
permit2 = {};
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -61,7 +61,7 @@ async def test_startup_no_auth_response(manager: ManagerClient, build_mode):
|
||||
session = c.connect()
|
||||
logging.info("Performing SELECT(*) FROM system.clients")
|
||||
res = session.execute("SELECT COUNT(*) FROM system.clients WHERE connection_stage = 'AUTHENTICATING' ALLOW FILTERING;")
|
||||
count = res[0][0]
|
||||
count = res.one()[0]
|
||||
logging.info(f"Observed {count} AUTHENTICATING connections...")
|
||||
if count >= num_connections/2:
|
||||
connections_observed = True
|
||||
|
||||
@@ -11,7 +11,7 @@ import pytest
|
||||
import time
|
||||
import random
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.manager_client import ManagerClient, ServerInfo
|
||||
from test.cluster.object_store.conftest import format_tuples
|
||||
from test.cluster.util import wait_for_cql_and_get_hosts, get_replication, new_test_keyspace
|
||||
from test.pylib.rest_client import read_barrier
|
||||
@@ -19,6 +19,7 @@ from test.pylib.util import unique_name, wait_all
|
||||
from cassandra.cluster import ConsistencyLevel
|
||||
from collections import defaultdict
|
||||
from test.pylib.util import wait_for
|
||||
from test.pylib.rest_client import HTTPError
|
||||
import statistics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -174,9 +175,8 @@ async def test_backup_endpoint_config_is_live_updateable(manager: ManagerClient,
|
||||
assert status is not None
|
||||
assert status['state'] == 'done'
|
||||
|
||||
|
||||
async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
breakpoint_name, min_files, max_files = None):
|
||||
async def do_test_backup_helper(manager: ManagerClient, object_storage,
|
||||
breakpoint_name, handler, num_servers: int = 1):
|
||||
'''helper for backup abort testing'''
|
||||
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
@@ -186,10 +186,9 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
server = (await manager.servers_add(num_servers, config=cfg, cmdline=cmd))[0]
|
||||
ks, cf = create_ks_and_cf(manager.get_cql())
|
||||
snap_name, files = await take_snapshot_on_one_server(ks, server, manager, logger)
|
||||
assert len(files) > 1
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
|
||||
@@ -206,26 +205,35 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
|
||||
print(f'Started task {tid}, aborting it early')
|
||||
await log.wait_for(breakpoint_name + ': waiting', from_mark=mark)
|
||||
await manager.api.abort_task(server.ip_addr, tid)
|
||||
await manager.api.message_injection(server.ip_addr, breakpoint_name)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
print(f'Status: {status}')
|
||||
assert (status is not None) and (status['state'] == 'failed')
|
||||
assert "seastar::abort_requested_exception (abort requested)" in status['error']
|
||||
await handler(server, prefix, files, tid)
|
||||
|
||||
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
|
||||
uploaded_count = 0
|
||||
for f in files:
|
||||
in_backup = f'{prefix}/{f}' in objects
|
||||
print(f'Check {f} is in backup: {in_backup}')
|
||||
if in_backup:
|
||||
uploaded_count += 1
|
||||
# Note: since s3 client is abortable and run async, we might fail even the first file
|
||||
# regardless of if we set the abort status before or after the upload is initiated.
|
||||
# Parallelism is a pain.
|
||||
assert min_files <= uploaded_count < len(files)
|
||||
assert max_files is None or uploaded_count < max_files
|
||||
async def do_test_backup_abort(manager: ManagerClient, object_storage,
|
||||
breakpoint_name, min_files, max_files = None):
|
||||
'''helper for backup abort testing'''
|
||||
|
||||
async def abort_and_check(server, prefix, files, tid):
|
||||
assert len(files) > 1
|
||||
await manager.api.abort_task(server.ip_addr, tid)
|
||||
await manager.api.message_injection(server.ip_addr, breakpoint_name)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
print(f'Status: {status}')
|
||||
assert (status is not None) and (status['state'] == 'failed')
|
||||
assert "seastar::abort_requested_exception (abort requested)" in status['error']
|
||||
|
||||
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
|
||||
uploaded_count = 0
|
||||
for f in files:
|
||||
in_backup = f'{prefix}/{f}' in objects
|
||||
print(f'Check {f} is in backup: {in_backup}')
|
||||
if in_backup:
|
||||
uploaded_count += 1
|
||||
# Note: since s3 client is abortable and run async, we might fail even the first file
|
||||
# regardless of if we set the abort status before or after the upload is initiated.
|
||||
# Parallelism is a pain.
|
||||
assert min_files <= uploaded_count < len(files)
|
||||
assert max_files is None or uploaded_count < max_files
|
||||
|
||||
await do_test_backup_helper(manager, object_storage, breakpoint_name, abort_and_check)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
@@ -456,7 +464,8 @@ class topo:
|
||||
self.racks = racks
|
||||
self.dcs = dcs
|
||||
|
||||
async def create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, object_storage=None):
|
||||
async def create_cluster(topology, manager, logger, object_storage=None):
|
||||
rf_rack_valid_keyspaces = (topology.rf <= topology.racks)
|
||||
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks, rf_rack_valid_keyspaces: {rf_rack_valid_keyspaces}')
|
||||
|
||||
cfg = {'task_ttl_in_seconds': 300, 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces}
|
||||
@@ -679,19 +688,19 @@ class SSTablesOnObjectStorage:
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("topology_rf_validity", [
|
||||
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
|
||||
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
|
||||
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
|
||||
@pytest.mark.parametrize("topology", [
|
||||
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
|
||||
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
|
||||
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
|
||||
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
|
||||
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
|
||||
])
|
||||
async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerClient, object_storage, topology_rf_validity):
|
||||
async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerClient, object_storage, topology):
|
||||
'''Check that restoring of a cluster with stream scopes works'''
|
||||
await do_test_streaming_scopes(build_mode, manager, topology_rf_validity, SSTablesOnObjectStorage(object_storage))
|
||||
await do_test_streaming_scopes(build_mode, manager, topology, SSTablesOnObjectStorage(object_storage))
|
||||
|
||||
|
||||
async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity, sstables_storage):
|
||||
async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topology, sstables_storage):
|
||||
'''
|
||||
This test creates a cluster specified by the topology parameter above,
|
||||
configurable number of nodes, tacks, datacenters, and replication factor.
|
||||
@@ -707,9 +716,7 @@ async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topo
|
||||
This stage parses the logs and checks that the data was streamed to nodes within the configured scope.
|
||||
'''
|
||||
|
||||
topology, rf_rack_valid_keyspaces = topology_rf_validity
|
||||
|
||||
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, sstables_storage.object_storage)
|
||||
servers, host_ids = await create_cluster(topology, manager, logger, sstables_storage.object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
cql = manager.get_cql()
|
||||
@@ -858,7 +865,8 @@ async def test_backup_broken_streaming(manager: ManagerClient, s3_storage):
|
||||
|
||||
res = cql.execute(f"SELECT COUNT(*) FROM {keyspace}.{table} BYPASS CACHE USING TIMEOUT 600s;")
|
||||
|
||||
assert res[0].count == expected_rows, f"number of rows after restore is incorrect: {res[0].count}"
|
||||
row = res.one()
|
||||
assert row.count == expected_rows, f"number of rows after restore is incorrect: {row.count}"
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
await log.wait_for("fully contained SSTables to local node from object storage", timeout=10)
|
||||
# just make sure we had partially contained sstables as well
|
||||
@@ -878,7 +886,7 @@ async def test_restore_primary_replica_same_domain(manager: ManagerClient, objec
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
|
||||
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
|
||||
servers, host_ids = await create_cluster(topology, manager, logger, object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
cql = manager.get_cql()
|
||||
@@ -940,7 +948,7 @@ async def test_restore_primary_replica_different_domain(manager: ManagerClient,
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
|
||||
servers, host_ids = await create_cluster(topology, True if domain == 'rack' else False, manager, logger, object_storage)
|
||||
servers, host_ids = await create_cluster(topology, manager, logger, object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
cql = manager.get_cql()
|
||||
@@ -971,3 +979,39 @@ async def test_restore_primary_replica_different_domain(manager: ManagerClient,
|
||||
streamed_to = set([ r[1].group(1) for r in res ])
|
||||
logger.info(f'{s.ip_addr} {host_ids[s.server_id]} streamed to {streamed_to}')
|
||||
assert len(streamed_to) == 2
|
||||
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_decommision_waits_for_backup(manager: ManagerClient, object_storage):
|
||||
'''check that backing up a snapshot for a keyspace blocks decommission'''
|
||||
|
||||
async def decommission_and_check(server: ServerInfo, prefix: str, files, tid):
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
async def finish_backup():
|
||||
# wait for snapshot to stop on waiting for backup
|
||||
await log.wait_for("Waiting for snapshot/backup tasks to finish", from_mark=mark)
|
||||
|
||||
mark2 = await log.mark()
|
||||
|
||||
# let the backup run and finish
|
||||
await manager.api.message_injection(server.ip_addr, "backup_task_pre_upload")
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
|
||||
objects = set(o.key for o in object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all())
|
||||
uploaded_count = 0
|
||||
# all files should be uploaded. note: can be zero due to two nodes
|
||||
for f in files:
|
||||
in_backup = f'{prefix}/{f}' in objects
|
||||
print(f'Check {f} is in backup: {in_backup}')
|
||||
if in_backup:
|
||||
uploaded_count += 1
|
||||
assert uploaded_count == len(files)
|
||||
# Now wait for decommission to finish
|
||||
await log.wait_for("DECOMMISSIONING: disabled backup and snapshots", from_mark=mark2)
|
||||
|
||||
await asyncio.gather(manager.decommission_node(server.server_id), finish_backup())
|
||||
|
||||
await do_test_backup_helper(manager, object_storage, "backup_task_pre_upload", decommission_and_check, 2)
|
||||
|
||||
|
||||
@@ -44,7 +44,6 @@ async def test_gossiper_empty_self_id_on_shadow_round(manager: ManagerClient):
|
||||
|
||||
logging.info("Starting cluster normally")
|
||||
node1 = await manager.server_add(cmdline=cmdline, start=False, config=cfg)
|
||||
manager.server_add(cmdline=cmdline, start=False)
|
||||
node1_log = await manager.server_open_log(node1.server_id)
|
||||
node2 = await manager.server_add(cmdline=cmdline, start=False, seeds=[node1.ip_addr])
|
||||
node2_log = await manager.server_open_log(node2.server_id)
|
||||
|
||||
@@ -7,6 +7,7 @@ import logging
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
from test.pylib.internal_types import ServerNum
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error_one_shot, InjectionHandler, read_barrier
|
||||
from test.cluster.util import create_new_test_keyspace
|
||||
@@ -19,6 +20,20 @@ def fixture_raft_op_timeout(build_mode):
|
||||
return 10000 if build_mode == 'debug' else 1000
|
||||
|
||||
|
||||
async def update_group0_raft_op_timeout(server_id: ServerNum, manager: ManagerClient, timeout: int) -> None:
|
||||
logger.info(f"Updating group0_raft_op_timeout_in_ms on server {server_id} to {timeout}")
|
||||
running_ids = [srv.server_id for srv in await manager.running_servers()]
|
||||
if server_id in running_ids:
|
||||
# If the node is alive, server_update_config only sends the SIGHUP signal to the Scylla process, so awaiting it
|
||||
# doesn't guarantee that the new config file is active. Work around this by looking at the logs.
|
||||
log_file = await manager.server_open_log(server_id)
|
||||
mark = await log_file.mark()
|
||||
await manager.server_update_config(server_id, 'group0_raft_op_timeout_in_ms', timeout)
|
||||
await log_file.wait_for("completed re-reading configuration file", from_mark=mark, timeout=60)
|
||||
else:
|
||||
await manager.server_update_config(server_id, 'group0_raft_op_timeout_in_ms', timeout)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
@pytest.mark.skip_mode(mode='debug', reason='aarch64/debug is unpredictably slow', platform_key='aarch64')
|
||||
@@ -41,7 +56,6 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
|
||||
|
||||
config = {
|
||||
'direct_failure_detector_ping_timeout_in_ms': 300,
|
||||
'group0_raft_op_timeout_in_ms': raft_op_timeout,
|
||||
'error_injections_at_startup': [
|
||||
{
|
||||
'name': 'raft-group-registry-fd-threshold-in-ms',
|
||||
@@ -63,6 +77,10 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
|
||||
manager.server_stop_gracefully(servers[3].server_id),
|
||||
manager.server_stop_gracefully(servers[4].server_id))
|
||||
|
||||
# Do it here to prevent unexpected timeouts before quorum loss.
|
||||
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, raft_op_timeout)
|
||||
for srv in servers[:2]))
|
||||
|
||||
logger.info("starting a sixth node with no quorum")
|
||||
await manager.server_add(expected_error="raft operation \\[read_barrier\\] timed out, there is no raft quorum",
|
||||
timeout=60)
|
||||
@@ -75,7 +93,6 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
|
||||
@pytest.mark.skip_mode(mode='debug', reason='aarch64/debug is unpredictably slow', platform_key='aarch64')
|
||||
async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_timeout: int) -> None:
|
||||
config = {
|
||||
'group0_raft_op_timeout_in_ms': raft_op_timeout,
|
||||
'error_injections_at_startup': [
|
||||
{
|
||||
'name': 'raft-group-registry-fd-threshold-in-ms',
|
||||
@@ -106,6 +123,9 @@ async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_time
|
||||
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
|
||||
manager.server_stop_gracefully(servers[2].server_id))
|
||||
|
||||
# Do it here to prevent unexpected timeouts before quorum loss.
|
||||
await update_group0_raft_op_timeout(servers[0].server_id, manager, raft_op_timeout)
|
||||
|
||||
logger.info("release join-node-before-add-entry injection")
|
||||
await injection_handler.message()
|
||||
|
||||
@@ -125,7 +145,6 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
|
||||
|
||||
logger.info("adding a fourth node")
|
||||
servers += [await manager.server_add(config={
|
||||
'group0_raft_op_timeout_in_ms': raft_op_timeout,
|
||||
'error_injections_at_startup': [
|
||||
{
|
||||
'name': 'raft-group-registry-fd-threshold-in-ms',
|
||||
@@ -152,6 +171,9 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
|
||||
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
|
||||
manager.server_stop_gracefully(servers[2].server_id))
|
||||
|
||||
# Do it here to prevent unexpected timeouts before quorum loss.
|
||||
await update_group0_raft_op_timeout(servers[3].server_id, manager, raft_op_timeout)
|
||||
|
||||
logger.info("release join-node-response_handler-before-read-barrier injection")
|
||||
injection_handler = InjectionHandler(manager.api,
|
||||
'join-node-response_handler-before-read-barrier',
|
||||
@@ -168,7 +190,6 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
|
||||
async def test_cannot_run_operations(manager: ManagerClient, raft_op_timeout: int) -> None:
|
||||
logger.info("starting a first node (the leader)")
|
||||
servers = [await manager.server_add(config={
|
||||
'group0_raft_op_timeout_in_ms': raft_op_timeout,
|
||||
'error_injections_at_startup': [
|
||||
{
|
||||
'name': 'raft-group-registry-fd-threshold-in-ms',
|
||||
@@ -188,6 +209,9 @@ async def test_cannot_run_operations(manager: ManagerClient, raft_op_timeout: in
|
||||
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
|
||||
manager.server_stop_gracefully(servers[2].server_id))
|
||||
|
||||
# Do it here to prevent unexpected timeouts before quorum loss.
|
||||
await update_group0_raft_op_timeout(servers[0].server_id, manager, raft_op_timeout)
|
||||
|
||||
logger.info("attempting removenode for the second node")
|
||||
await manager.remove_node(servers[0].server_id, servers[1].server_id,
|
||||
expected_error="raft operation [read_barrier] timed out, there is no raft quorum",
|
||||
@@ -231,9 +255,7 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
|
||||
await asyncio.gather(*(manager.server_stop(srv.server_id) for srv in servers))
|
||||
|
||||
# This ensures the read barriers below fail quickly without group 0 quorum.
|
||||
logger.info(f"Decreasing group0_raft_op_timeout_in_ms on {servers}")
|
||||
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', raft_op_timeout)
|
||||
for srv in servers))
|
||||
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, raft_op_timeout) for srv in servers))
|
||||
|
||||
logger.info(f"Restarting {servers[:2]} with no group 0 quorum")
|
||||
for idx, srv in enumerate(servers[:2]):
|
||||
@@ -245,8 +267,7 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
|
||||
|
||||
# Increase the timeout back to 300s to ensure the new group 0 leader is elected before the first read barrier below
|
||||
# times out.
|
||||
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', 300000)
|
||||
for srv in servers))
|
||||
await asyncio.gather(*(update_group0_raft_op_timeout(srv.server_id, manager, 300000) for srv in servers))
|
||||
|
||||
logger.info(f"Restarting {servers[2:]} with group 0 quorum")
|
||||
for srv in servers[2:]:
|
||||
|
||||
@@ -75,16 +75,16 @@ class SSTablesOnLocalStorage:
|
||||
await asyncio.gather(*(self.refresh_one(manager, s, ks, cf, sstables, scope, primary_replica_only) for s, sstables in sstables_per_server.items()))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("topology_rf_validity", [
|
||||
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
|
||||
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
|
||||
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
|
||||
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
|
||||
@pytest.mark.parametrize("topology", [
|
||||
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
|
||||
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
|
||||
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
|
||||
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
|
||||
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
|
||||
])
|
||||
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity):
|
||||
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology):
|
||||
'''Check that refreshing of a cluster with stream scopes works'''
|
||||
await do_test_streaming_scopes(build_mode, manager, topology_rf_validity, SSTablesOnLocalStorage())
|
||||
await do_test_streaming_scopes(build_mode, manager, topology, SSTablesOnLocalStorage())
|
||||
|
||||
|
||||
async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
||||
@@ -94,7 +94,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
||||
|
||||
topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1)
|
||||
|
||||
servers, host_ids = await create_cluster(topology, True, manager, logger)
|
||||
servers, host_ids = await create_cluster(topology, manager, logger)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ async def test_snapshot_on_all_nodes(manager: ManagerClient):
|
||||
"""
|
||||
topology = topo(rf = 3, nodes = 3, racks = 3, dcs = 1)
|
||||
|
||||
servers, _ = await create_cluster(topology, True, manager, logger)
|
||||
servers, _ = await create_cluster(topology, manager, logger)
|
||||
|
||||
snapshot_name = unique_name('snap_')
|
||||
|
||||
|
||||
@@ -182,7 +182,7 @@ async def test_tombstone_gc_insert_flush(manager: ManagerClient):
|
||||
async def test_tablet_manual_repair_all_tokens(manager: ManagerClient):
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)
|
||||
token = "all"
|
||||
now = datetime.datetime.utcnow()
|
||||
now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
|
||||
map1 = await load_tablet_repair_time(cql, hosts[0:1], table_id)
|
||||
|
||||
await guarantee_repair_time_next_second()
|
||||
|
||||
@@ -1324,7 +1324,7 @@ async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, n
|
||||
return servers
|
||||
|
||||
|
||||
class TestContext:
|
||||
class Context:
|
||||
def __init__(self, ks: str, table: str, rf: int, initial_tablets: int, num_keys: int):
|
||||
self.ks = ks
|
||||
self.table = table
|
||||
@@ -1347,7 +1347,7 @@ async def create_and_populate_table(manager: ManagerClient, rf: int = 3, initial
|
||||
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': {initial_tablets}}}")
|
||||
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int PRIMARY KEY, c int)")
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.{table} (pk, c) VALUES ({k}, 1);") for k in range(num_keys)])
|
||||
yield TestContext(ks, table, rf, initial_tablets, num_keys)
|
||||
yield Context(ks, table, rf, initial_tablets, num_keys)
|
||||
finally:
|
||||
await cql.run_async(f"DROP KEYSPACE {ks}")
|
||||
|
||||
|
||||
@@ -53,6 +53,11 @@ filterwarnings =
|
||||
ignore::DeprecationWarning:importlib._bootstrap
|
||||
ignore::DeprecationWarning:botocore
|
||||
ignore::DeprecationWarning:pytest_elk_reporter
|
||||
ignore::sqlalchemy.exc.MovedIn20Warning:kmip.pie.sqltypes
|
||||
ignore::cryptography.utils.CryptographyDeprecationWarning:kmip
|
||||
ignore:TypeDecorator .* will not produce a cache key:sqlalchemy.exc.SAWarning:kmip
|
||||
ignore:Exception in thread[\s\S]*kmip[\s\S]*shutdown:pytest.PytestUnhandledThreadExceptionWarning
|
||||
|
||||
|
||||
tmp_path_retention_count = 1
|
||||
tmp_path_retention_policy = failed
|
||||
|
||||
Reference in New Issue
Block a user