Compare commits

...

73 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
ea26e4b3a5 Use tablet_task_info fields for tablet task timing
For tablet_virtual_task, set creation_time to tablet_task_info::request_time
and start_time to tablet_task_info::sched_time, matching the actual semantics
of when the request was created vs when it was scheduled for execution.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 15:05:50 +00:00
copilot-swe-agent[bot]
3b793ef09f Merge topology_request_tracking_mutation_builder calls for keyspace_rf_change
Combined the two separate topology_request_tracking_mutation_builder calls
into one, setting both start_time and done status in the same mutation builder
to reduce redundancy.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:53:23 +00:00
copilot-swe-agent[bot]
df0a59ba03 Fix start_time setting to be together with operation start
Refactored start_time setting for global requests to be included in the same
mutation batch that starts the actual operation, matching the pattern used for
node operations. This avoids an extra update_topology_state call and ensures
start_time is set atomically with the operation start.

Also updated nodetool tasks documentation to include creation_time field in
example outputs for status, list, and tree commands.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:40:01 +00:00
copilot-swe-agent[bot]
69024a09b2 Add creation_time field to nodetool tasks subcommands
Extended scylla-nodetool.cc to display creation_time in all task-related outputs:
- tasks_print_status: Added creation_time to time field formatting
- tasks_print_trees: Added creation_time column to task tree display
- tasks_print_stats_list: Added creation_time column to task stats list

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:28:01 +00:00
copilot-swe-agent[bot]
bb8f28a1ab Fix start_time setting: remove from request creation, add to execution start
Remove incorrect start_time setting from request creation sites for:
- cleanup requests
- new_cdc_generation requests
- truncate_table requests
- keyspace_rf_change requests

Add start_time setting in topology_coordinator::handle_global_request
when execution begins, matching the pattern for node operations.

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-08 14:09:22 +00:00
copilot-swe-agent[bot]
32bc7e3a1c Add creation_time field to task status and stats API
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-08 13:26:09 +00:00
copilot-swe-agent[bot]
fb4e37248d Initial plan 2025-12-08 13:03:16 +00:00
Avi Kivity
45c16553eb Revert "Update tools/cqlsh submodule"
This reverts commit ff1b212319. In this
commit, the python driver was updated to 3.29.6. That version has a
serious flaw - it rejects compression=None settings [1] which
cqlsh (legitimately) uses in copyutil.py.

The reason this hasn't caused numerous continuous integration failures
is that the submodule update commit did not update the frozen toolchain,
so the build was effectively running with an older version of the driver.

Fix by reverting the change. This allows us to regenerate the frozen
toolchain when we need to.

Reverted changes:

* tools/cqlsh 2240122...6badc99 (2):
  > Update scylla-driver version to 3.29.6
  > Revert "Migrate workflows to Blacksmith"

[1] 78f554236f

Closes scylladb/scylladb#27473
2025-12-08 08:50:52 +02:00
Nadav Har'El
c984f557ef Merge 'alternator: eliminate cross shard ::free for do_batch_write' from Petr Gusev
This is an optimization follow-up [for this PR](https://github.com/scylladb/scylladb/pull/27396#issuecomment-3611410774): avoiding destruction of foreign objects on the wrong shard. Releasing objects allocated on a different shard causes their ::free calls to be executed remotely, which adds unnecessary load to the SMP subsystem.

Before this PR, a `std::vector<put_or_delete_item>` could be moved to another shard. When the vector was eventually destroyed, its ::free had to be marshalled back to the shard where the memory had originally been allocated. This change avoids that overhead by passing the vector by const reference instead.

backport: not needed, this is an optimization

Closes scylladb/scylladb#27432

* github.com:scylladb/scylladb:
  alternator/executor.cc: avoid cross-shard free
  storage_proxy: cas: take cas_request by raw reference
2025-12-07 22:54:36 +02:00
Andrei Chekun
5e83311305 test.py: switch to ThreadPoolExecutor
With python 3.14, the Process fails due to pickling issue with nodes objects.
This will eliminate this issue, so we can bump up the python version.

Closes scylladb/scylladb#27456
2025-12-07 17:37:25 +02:00
Petr Gusev
f00f7976c1 alternator/executor.cc: avoid cross-shard free
This commit is an optimization: avoiding destruction of
foreign objects on the wrong shard. Releasing objects allocated on a
different shard causes their ::free calls to be executed remotely,
which adds unnecessary load to the SMP subsystem.

Before this patch, a std::vector could be moved
to another shard. When the vector was eventually destroyed,
its ::free had to be marshalled back to the shard where the memory had
originally been allocated. This change avoids that overhead by passing
the vector by const reference instead.

The referenced objects lifetime correctness reasoning:
* the put_or_delete_item refs usages in put_or_delete_item_cas_request
are bound to its lifetime
* cas_request lifetime is bound to storage_proxy::cas future
* we don't release put_or_delete_item-s untill all storage_proxy::cas
calls are done.
2025-12-07 16:14:56 +01:00
Petr Gusev
c428645d16 storage_proxy: cas: take cas_request by raw reference
In the next commit we want to add an optimization that relies on
precise control over the lifetime of cas_request. In particular, we
want the implementation of this interface in Alternator to operate on
raw references that are guaranteed to remain valid only until the
cas() future is resolved. We already depend on the same lifetime
assumptions in cas_request when used by modification_statement.
However, these assumptions are not clearly expressed in the current
interface: cas_request is taken by shared_ptr, and nothing prevents
cas() from storing that pointer inside paxos_response_handler, which
may outlive the cas() future.

This commit fixes that by taking cas_request by raw reference. This
makes it explicit that cas() does not assume ownership of the object.
Callers must ensure that the referenced object remains valid until
the returned future is resolved.
2025-12-07 16:14:56 +01:00
Tomasz Grabiec
082342ecad Attach names to allocating sections for better debuggability
Large reserves in allocating_section can cause stalls. We already log
reserve increase, but we don't know which table it belongs to:

  lsa - LSA allocation failure, increasing reserve in section 0x600009f94590 to 128 segments;

Allocating sections used for updating row cache on memtable flush are
notoriously problematic. Each table has its own row_cache, so its own
allocating_section(s). If we attached table name to those sections, we
could identify which table is causing problems. In some issues we
suspected system.raft, but we can't be sure.

This patch allows naming allocating_sections for the purpose of
identifying them in such log messages. I use abstract_formatter for
this purpose to avoid the cost of formatting strings on the hot path
(e.g. index_reader). And also to avoid duplicating strings which are
already stored elsewhere.

Fixes #25799

Closes scylladb/scylladb#27470
2025-12-07 14:14:25 +02:00
Avi Kivity
47efbdffbc Merge 'cache, mvcc: Preempt cache update when applying range tombstone from memtable' from Tomasz Grabiec
Range tombstones are represented as entry attributes, which applies to
the interval between entries. So if a range tombstone covers many
rows, to apply it we have to update all covered entries.  In some
workloads that could be many entries, even the whole cache.  Before
the patch, we did this update without preemption, which can cause
reactor stalls in such workloads.

This scenario is already covered by mvcc_tests,
e.g. test_apply_to_incomplete_respects_continuity. And I verified that
the new preemption point is hit in the test.

perf-row-cache-update results show no significant stalls anymore (max
2ms scheduling delay, instead of previous 1.5 s):

    Generated 1124195 rows
    Memtable fill took 4179.457520 [ms], {count: 8295, 99%: 0.654949 [ms], max: 32.817176 [ms]}
    Draining...
    took 0.000616 [ms]
    cache: 2506/2948 [MB], memtable: 781/1024 [MB], alloc/comp: 1051/662 [MB] (amp: 0.630)
    update: 2874.157471 [ms], preemption: {count: 26650, 99%: 1.131752 [ms], max: 2.068762 [ms]}, cache: 3027/3973 [MB], alloc/comp: 3951/2424 [MB] (amp: 0.614), pr/me/dr 1124195/0/0

Fixes #23479
Fixes #2578

Closes scylladb/scylladb#27469

* github.com:scylladb/scylladb:
  cache, mvcc: Preempt cache update when applying range tombstone from memtable
  partition_snapshot_row_cursor: Clarify non-obvious semantic difference of range_tombstone()
  perf-row-cache-update: Add scenario with large tombstone covering many rows
2025-12-07 11:54:15 +02:00
Avi Kivity
d811eeb4ca Merge 'Make direct failure detector verb handler more efficient' from Gleb Natapov
We saw that in large clusters direct failure detector may cause large task queues to be accumulated. The series address this issue and also moves the code into the correct scheduling group.

Fixes https://github.com/scylladb/scylladb/issues/27142

Backport to all version where 60f1053087 was backported to since it should improve performance in large clusters.

Closes scylladb/scylladb#27387

* github.com:scylladb/scylladb:
  direct_failure_detector: run direct failure detector in the gossiper scheduling group
  raft: drop invoke_on from the pinger verb handler
  direct_failure_detector: pass timeout to direct_fd_ping verb
2025-12-07 11:40:26 +02:00
Marcin Maliszkiewicz
4784e39665 auth: fix ctor signature of certificate_authenticator
In b9199e8b24 we
added cache argument to constructor of authenticators
but certificate_authenticator was ommited. Class
registrator sadly only fails in runtime for such
cases.

Fixes https://github.com/scylladb/scylladb/issues/27431

Closes scylladb/scylladb#27434
2025-12-07 11:18:42 +02:00
Tomasz Grabiec
d4014b7970 Drop legacy schema support
We switched to using v3 schema tables (in system_schema keyspace) in
2017, in 9eb91bc30b.

So no system should have the old schema any more.

No need to run legacy_schema_migrator on boot.

Closes scylladb/scylladb#27420
2025-12-07 00:09:13 +02:00
Tomasz Grabiec
92b5e4d63d cache, mvcc: Preempt cache update when applying range tombstone from memtable
Range tombstones are represented as entry attributes, which applies to
the interval between entries. So if a range tombstone covers many
rows, to apply it we have to update all covered entries.  In some
workloads that could be many entries, even the whole cache.  Before
the patch, we did this update without preemption, which can cause
reactor stalls in such workloads.

This scenario is already covered by mvcc_tests,
e.g. test_apply_to_incomplete_respects_continuity. And I verified that
the new preemption point is hit in the test.

perf-row-cache-update results show no significant stalls anymore (max
2ms scheduling delay, instead of previous 1.5 s):

Generated 1124195 rows
Memtable fill took 4179.457520 [ms], {count: 8295, 99%: 0.654949 [ms], max: 32.817176 [ms]}
Draining...
took 0.000616 [ms]
cache: 2506/2948 [MB], memtable: 781/1024 [MB], alloc/comp: 1051/662 [MB] (amp: 0.630)
update: 2874.157471 [ms], preemption: {count: 26650, 99%: 1.131752 [ms], max: 2.068762 [ms]}, cache: 3027/3973 [MB], alloc/comp: 3951/2424 [MB] (amp: 0.614), pr/me/dr 1124195/0/0

Fixes #23479
Fixes #2578
2025-12-06 13:45:35 +01:00
Tomasz Grabiec
e546143fd9 partition_snapshot_row_cursor: Clarify non-obvious semantic difference of range_tombstone() 2025-12-06 01:03:10 +01:00
Tomasz Grabiec
721434054b perf-row-cache-update: Add scenario with large tombstone covering many rows
Fills memtable with rows and a tombstone which deletes all rows which
are already in cache.

Similar to raft log workload, but more extreme.

With -c1 -m4G, observed really bad performance:

update: 1711.976196 [ms], preemption: {count: 22603, 99%: 0.943127 [ms], max: 1494.571776 [ms]}, cache: 2148/2906 [MB], alloc/comp: 1334/869 [MB] (amp: 0.651), pr/me/dr 1062186/0/1062187
cache: 2148/2906 [MB], memtable: 738/1024 [MB], alloc/comp: 993/0 [MB] (amp: 0.000)

Which means that max reactor stall during cache update was 1.5 [s]
0.7 GB memtables. 2.1 GB in cache.
2025-12-06 01:03:09 +01:00
Nadav Har'El
350cbd1d66 alternator: fix typo of BatchWriteItem in comments
The DynamoDB API's "BatchWriteItem" operation is spelled like this, in
singular. Some comments incorrectly referred to as BatchWriteItems - in
plural. This patch fixes those mistakes.

There are no functional changes here or changes to user-facing documents -
these mistakes were only in code comments.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes scylladb/scylladb#27446
2025-12-05 15:08:58 +02:00
Botond Dénes
866c96f536 Merge 'Add digests for all sstable components in scylla metadata' from Taras Veretilnyk
This pull request adds support for calculation and storing CRC32 digests for all SSTable components.
This change replaces plain file_writer with crc32_digest_file_writer for all SSTable components that should be checksummed. The resulting component digests are stored in the sstable structure
and later persisted to disk as part of the Scylla metadata component during writer::consume_end_of_stream.
All important SSTable components (Index, Partitions, Rows, Summary, Filter, CompressionInfo, and TOC) are covered.
Several test cases where introduced to verify expected behaviour.

Backport is not required, it is a new feature

Fixes #20100

Closes scylladb/scylladb#27287

* github.com:scylladb/scylladb:
  sstable_test: add verification testcases of SSTable components digests persistance
  sstables: store digest of all sstable components in scylla metadata
  sstables: Add TemporaryScylla metadata component type
  sstables: Extract file writer closing logic into separate methods
  sstables: Add components_digests to scylla metadata components
  sstables: Implement CRC32 digest-only writer
2025-12-05 11:36:50 +02:00
Botond Dénes
367633270a Merge 'EAR: handle IPV6 hosts in KMIP and use shared (improved) http parser in AWS/Azure' from Calle Wilund
Fixes #27367
Fixes #27362
Fixes #27366

Makes http URL parser handle IPv6.
Makes KMIP host setup handle IPv6 hosts + use system trust if no truststore set
Moves Azure/KMS code to use shared http URL parser to avoid same regex everywhere.

Closes scylladb/scylladb#27368

* github.com:scylladb/scylladb:
  ear::kms/ear::azure: Use utils::http URL parsing
  ear::kmip_host: Handle ipv6 hosts + use system trust when not specified
  utils::http: Handle ipv6 numeric host part in URL:s
2025-12-05 10:43:07 +02:00
Asias He
e97a504775 repair: Allow min max range to be updated for repair history
It is observed that:

repair - repair[667d4a59-63fb-4ca6-8feb-98da49946d8b]: Failed to update
system.repair_history table of node d27de212-6f32-4649ad76-a9ef1165fdcb:
seastar::rpc::remote_verb_error (repair[667d4a59-63fb-4ca6-8feb-98da49946d8b]: range (minimum
token,maximum token) is not in the format of (start, end])

This is because repair checks the end of the range to be repaired needs
to be inclusive. When small_table_optimization is enabled for regular
repair, a (minimum token,maximum token) will be used.

To fix, we can relax the check of (start, end] for the min max range.

Fixes #27220

Closes scylladb/scylladb#27357
2025-12-05 10:41:25 +02:00
Anna Stuchlik
a5c971d21c doc: update the upgrade policy to cover non-consecutive minor upgrades
Fixes https://github.com/scylladb/scylladb/issues/27308

Closes scylladb/scylladb#27319
2025-12-05 10:31:53 +02:00
Guy Shtub
a0809f0032 Update integration-jaeger.rst
Fixing broken link in Jaeger Docs to ScyllaDB

Closes scylladb/scylladb#26406
2025-12-05 10:23:07 +02:00
Piotr Dulikowski
bb6e41f97a index: allow vector indexes without rf_rack_valid_keyspces
The rf_rack_valid_keyspaces option needs to be turned on in order to
allow creating materialized views in tablet keyspaces with numeric RF
per DC. This is also necessary for secondary indexes because they use
materialized views underneath. However, this option is _not_ necessary
for vector store indexes because those use the external vector store
service for querying the list of keys to fetch from the main table, they
do not create a materialized view. The rf_rack_valid_keyspaces was, by
accident, required for vector indexes, too.

Remove the restriction for vector store indexes as it is completely
unnecessary.

Fixes: SCYLLADB-81

Closes scylladb/scylladb#27447
2025-12-05 09:26:26 +02:00
Marcin Maliszkiewicz
4df6b51ac2 auth: fix cache::prune_all roles iteration
During b9199e8b24
reivew it was suggested to use standard for loop
but when erasing element it causes increment on
invalid iterator, as role could have been erased
before.

This change brings back original code.

Fixes: https://github.com/scylladb/scylladb/issues/27422
Backport: no, offending commit not released yet

Closes scylladb/scylladb#27444
2025-12-04 23:35:54 +01:00
Taras Veretilnyk
0c8730ba05 sstable_test: add verification testcases of SSTable components digests persistance
Adds a generic test helper that writes a random SSTable, reloads it, and
verifies that the persisted CRC32 digest for each component matches the
digest computed from disk. Those covers all checksummed components test cases.
2025-12-04 21:09:01 +01:00
Taras Veretilnyk
bc2e83bc1f sstables: store digest of all sstable components in scylla metadata
This change replaces plain file_writer with crc32_digest_file_writer
for all SSTable components that should be checksummed. The resulting component
digests are stored in the sstable structure and later persisted to disk
as part of the Scylla metadata component during writer::consume_end_of_stream.
2025-12-04 21:00:09 +01:00
Patryk Jędrzejczak
f4c3d5c1b7 Merge 'fix test_coordinator_queue_management flakiness' from Gleb Natapov
After 39cec4ae45 node join may fail with either "request canceled" notification or (very rarely) because it was banned. Depend on timing. The series fixes the test to check for both possibilities.

Fixes #27320

No need to backport since the flakiness is in the mater only.

Closes scylladb/scylladb#27408

* https://github.com/scylladb/scylladb:
  test: fix test_coordinator_queue_management flakiness
  test/pylib: allow expected_error in server_start to contain regular expression
2025-12-04 16:08:02 +01:00
Tomasz Grabiec
e54abde3e8 Merge 'main: delay setup of storage_service REST API' from Andrzej Jackowski
The storage_service REST API uses `group0` internally. Before this
patch, it was possible to send an HTTP request before `group0` was
initialized, which resulted in a segmentation fault. Therefore,
this patch delays the setup of the storage_service REST API.

Additionally, `test_rest_api_on_startup` is added to reproduce the problem.

Fixes: https://github.com/scylladb/scylladb/issues/27130

No backport. It's a crash fix but possible only if a request is sent in a very specific phase of a node start.

Closes scylladb/scylladb#27410

* github.com:scylladb/scylladb:
  test: add test_rest_api_on_startup
  main: delay setup of storage_service REST API
2025-12-04 14:56:49 +01:00
Avi Kivity
9696ee64d0 database: fix overflow when computing data distribution over shards
We store the per-shard chunk count in a uint64_t vector
global_offset, and then convert the counts to offsets with
a prefix sum:

```c++
        // [1, 2, 3, 0] --> [0, 1, 3, 6]
        std::exclusive_scan(global_offset.begin(), global_offset.end(), global_offset.begin(), 0, std::plus());
```

However, std::exclusive_scan takes the accumulator type from the
initial value, 0, which is an int, instead of from the range being
iterated, which is of uint64_t.

As a result, the prefix sum is computed as a 32-bit integer value. If
it exceeds 0x8000'0000, it becomes negative. It is then extended to
64 bits and stored. The result is a huge 64-bit number. Later on
we try to find an sstable with this chunk and fail, crashing on
an assertion.

An example of the failure can be seen here: https://godbolt.org/z/6M8aEbo57

The fix is simple: the initial value is passed as uint64_t instead of int.

Fixes https://github.com/scylladb/scylladb/issues/27417

Closes scylladb/scylladb#27418
2025-12-04 14:10:53 +01:00
Calle Wilund
8dd69f02a8 ear::kms/ear::azure: Use utils::http URL parsing
Fixes #27367

Move to reuse shared code.
2025-12-04 11:38:41 +00:00
Calle Wilund
d000fa3335 ear::kmip_host: Handle ipv6 hosts + use system trust when not specified
Fixes #27362

The KMIP host connector should handle ipv4 connections (named or numeric).
It also should fall back to system trust when truststore is not specified.
2025-12-04 11:38:41 +00:00
Calle Wilund
4e289e8e6a utils::http: Handle ipv6 numeric host part in URL:s
Fixes #27366

A URL with numeric host part formats special in case of ipv6,
to avoid confusion with port part.
The parser should handle this.

I.e.
http://[2001:db8:4006:812::200e]:8080

v2:
* Include scheme agnostic parse + case insensitive scheme matching
2025-12-04 11:38:41 +00:00
Botond Dénes
9d2f7c3f52 Merge 'mv: allow setting concurrency in PRUNE MATERIALIZED VIEW' from Wojciech Mitros
The PRUNE MATERALIZED VIEW statement is performed as follows:
1. Perform a range scan of the view table from the view replicas based
on the ranges specified in the statement.
2. While reading the paged scan above, for each view row perform a read
from all base replicas at the corresponding primary key. If a discrepancy
is detected, delete the row in the view table.

When reading multiple rows, this is very slow because for each view row
we need to performe a single row query on multiple replicas.
In this patch we add an option to speed this up by performing many of the
single base row reads concurrently, at the concurrency specified in the
USING CONCURRENCY clause.

Aside from the unit test, I checked manually on a 3-node cluster with 10M rows, using vnodes. There were actually no ghost rows in the test, but we still had to iterate over all view rows and read the corresponding base rows. And actual ghost rows, if there are any, should be a tiny fraction of all rows. I compared concurrencies 1,2,10,100 and the results were:
* Pruning with concurrency 1 took total 1416 seconds
* Pruning with concurrency 2 took total 731 seconds
* Pruning with concurrency 10 took total 234 seconds
* Pruning with concurrency 100 took total 171 seconds
So after a concurrency of 10 or so we're hitting diminishing returns (at least in this setup). At that point we may be no longer bottlenecked by the reads, but by CPU on the shard that's handling the PRUNE

Fixes https://github.com/scylladb/scylladb/issues/27070

Closes scylladb/scylladb#27097

* github.com:scylladb/scylladb:
  mv: allow setting concurrency in PRUNE MATERIALIZED VIEW
  cql: add CONCURRENCY to the USING clause
2025-12-04 11:47:41 +02:00
Aleksandra Martyniuk
e3e81a9a7a repair: throw if flush failed in get_flush_time
Currently, _flush_time was stored as a std::optional<gc_clock::time_point>
and std::nullopt indicates that the flush was needed but failed. It's confusing
for the caller and does not work as expected since the _flush_time is initialized
with value (not optional).

Change _flush_time type to gc_clock::time_point. If a flush is needed but failed,
get_flush_time() throws an exception.

This was suppose to be a part of https://github.com/scylladb/scylladb/pull/26319
but it was mistakenly overwritten during rebases.

Refs: https://github.com/scylladb/scylladb/issues/24415.

Closes scylladb/scylladb#26794
2025-12-04 11:45:53 +02:00
Gleb Natapov
86dde50c0d direct_failure_detector: run direct failure detector in the gossiper scheduling group
When direct failure detector was introduces the idea was that it will
run on the same connection raft group0 verbs are running, but in
60f1053087 raft verbs were moved to run on the gossiper connection
while DIRECT_FD_PING was left where it was. This patch move it to
gossiper connection as well and fix the pinger code to run in gossiper
scheduling group.
2025-12-04 11:35:43 +02:00
Gleb Natapov
6a6bbbf1a6 raft: drop invoke_on from the pinger verb handler
Currently raft direct pinger verb jumps to shard 0 to check if group0 is
alive before replying. The verb runs relatively often, so it is not very
efficient. The patch distributes group0 liveness information (as it
changes) to all shard instead, so that the handler itself does not need
to jump to shard 0.
2025-12-04 11:35:43 +02:00
Avi Kivity
b82f92b439 main: replace p11-kit hack for trust paths override with gnutls hack
p11-kit has hardcoded paths for the trust paths. Of course, each
Linux distribution hardcodes those paths differently. As a result,
our relocatable gnutls, which uses p11-kit-trust.so to process the
trust paths, needs some overrides to select the right paths.

Currently, we use p11_kit_override_system_files(), a p11-kit API
intended for testing, but which worked well enough for our purpose,
to override the trust module configuration.

Unfortunately, starting (presumably [1]) in gnutls 3.8.11, gnutls
changed how it works with p11-kit and our override is now ignored.

This was likely unintentional, but there appears to be a better way:
instead of letting gnutls auto-load the trust module from a hacked
configuration, we load the modules outselves using
gnutls_pkcs11_init(GNUTLS_PKCS11_FLAG_MANUAL) and
gnutls_pkcs11_add_provider(). These appear to be intended for the purpose.

We communicate the paths to the scylla executable using an environment
variable. This isn't optimal, but is much easier than adding a command
line variable since there are multiple levels of command line parsing due
to the subtool mechanism.

With this, we unlock the possibility to upgrade gnutls to newer versions.

[1] aa5f15a872

Closes scylladb/scylladb#27348
2025-12-04 11:33:51 +02:00
Gleb Natapov
f00e00fde0 test: fix test_coordinator_queue_management flakiness
After 39cec4ae45 node join may fail with either "request canceled"
notification or (very rarely) because it was banned. Depend on timing.
The patch fixes the test to check for both possibilities.
2025-12-04 11:06:20 +02:00
Gleb Natapov
b0727d3f2a test/pylib: allow expected_error in server_start to contain regular expression
Currently expected_error parameter to server_start can only work with
exact matches. Change it to support regular expressions.
2025-12-04 11:06:20 +02:00
Calle Wilund
4169bdb7a6 encryption::gcp_host: Add exponential retry for server errors
Fixes #27242

Similar to AWS, google services may at times simply return a 503,
more or less meaning "busy, please retry". We rely for most cases
higher up layers to handle said retry, but we cannot fully do so,
because both we reach this code sometimes through paths that do
no such thing, and also because it would be slightly inefficient,
since we'd like to for example control the back-off for auth etc.

This simply changes the existing retry loop in gcp_host to
be a little more forgiving, special case 503 errors and extend
the retry to the auth part, as well as re-use the
exponential_backoff_retry primitive.

v2:
* Avoid backoff if refreshing credentials. Should not add latency due to this.
* Only allow re-auth once per (non-service-failure-backoff) try.
* Add abort source to both request and retry
v3:
* Include timeout and other server errors in retry-backoff
v4:
* Reorder error code handling correctly

Closes scylladb/scylladb#27267
2025-12-04 10:13:37 +02:00
Anna Stuchlik
c5580399a8 replace the Driver pages with a link to the new Drivers pages
This commit removes the now redundant driver pages from
the Scylla DB documentation. Instead, the link to the pages
where we moved the diver information is added.
Also, the links are updated across the ScyllaDB manual.

Redirections are added for all the removed pages.

Fixes https://github.com/scylladb/scylladb/issues/26871

Closes scylladb/scylladb#27277
2025-12-04 10:07:27 +02:00
Tomasz Grabiec
1d42770936 Merge 'topology_coordinator: Add barrier to cleanup_target' from Łukasz Paszkowski
Consider the following scenario:
1. A table has RF=3 and writes use CL=QUORUM
2. One node is down
3. There is a pending tablet migration from the unavailable node
   that is reverted

During the revert, there can be a time window where the pending replica
being cleaned up still accepts writes. This leads to write failures,
as only two nodes (out of four) are able to acknowledge writes.

This patch fixes the issue by adding a barrier to the cleanup_target
tablet transition state, ensuring that the coordinator switches back to
the previous replica set before cleanup is triggered.

Fixes https://github.com/scylladb/scylladb/issues/26512

It's a pre existing issue. Backport is required to all recent 2025.x versions.

Closes scylladb/scylladb#27413

* github.com:scylladb/scylladb:
  topology_coordinator: Fix the indentation for the cleanup_target case
  topology_coordinator: Add barrier to cleanup_target
  test_node_failure_during_tablet_migration: Increase RF from 2 to 3
2025-12-03 23:57:45 +01:00
Taras Veretilnyk
d287b054b9 sstables: Add TemporaryScylla metadata component type
Add TemporaryScylla component type to make atomic updates of SSTable Scylla metadata using temporary files
and atomic rename operations possible. This will be needed in further commit to rewrite metadata together with
the statistics component.
2025-12-03 23:40:10 +01:00
Szymon Wasik
4f803aad22 Improve documentation of vector search configuration parameters.
This patch adds separate group for vector search parameters in the
documentation and fixes small typos and formatting.

Fixes: SCYLLADB-77.

Closes scylladb/scylladb#27385
2025-12-03 21:02:59 +02:00
Karol Nowacki
a54bf50290 vector_search: Fix requests hanging on unreachable nodes
When a vector store node becomes unreachable, a client request sent
before the keep-alive timer fires would hang until the CQL query
timeout was reached.

This occurred because the HTTP request writes to the TCP buffer and then
waits for a response. While data is in the buffer, TCP retransmissions
prevent the keep-alive timer from detecting the dead connection.

This patch resolves the issue by setting the `TCP_USER_TIMEOUT` socket
option, which applies an effective timeout to TCP retransmissions,
allowing the connection to fail faster.

Closes scylladb/scylladb#27388
2025-12-03 21:01:43 +02:00
Nadav Har'El
06dd3b2e64 install-dependencies.sh: add zlib
Scylla uses zlib, through the header <zlib.h>, in sstable compression.
We also want to use it in Alternator for gzip-compressed requests.

We never actually required zlib explicltly in install-dependencies.sh,
we only get it through transitive dependencies. But it's better to
require it explicitly so this is what we do in this patch.

In Fedora, we use the newer, more efficient, zlib-ng which is API-
compatible with the classic zlib. Unfortunately, the Debian zlib-ng
package is *not* drop-in compatible with zlib (you need to include
a different header file <zlib-ng.h>) so we use the classic zlib.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes scylladb/scylladb#27238
2025-12-03 19:30:36 +02:00
Łukasz Paszkowski
6163fedd2e topology_coordinator: Fix the indentation for the cleanup_target case 2025-12-03 16:37:33 +01:00
Łukasz Paszkowski
67f1c6d36c topology_coordinator: Add barrier to cleanup_target
Consider the following scenario:
1. A table has RF=3 and writes use CL=QUORUM
2. One node is down
3. There is a pending tablet migration from the unavailable node
   that is reverted

During the revert, there can be a time window where the pending replica
being cleaned up still accepts writes. This leads to write failures,
as only two nodes (out of four) are able to acknowledge writes.

This patch fixes the issue by adding a barrier to the cleanup_target
tablet transition state, ensuring that the coordinator switches back to
the previous replica set before cleanup is triggered.

Fixes https://github.com/scylladb/scylladb/issues/26512
2025-12-03 16:19:17 +01:00
Łukasz Paszkowski
669286b1d6 test_node_failure_during_tablet_migration: Increase RF from 2 to 3
The patch prepares the test for additional write workload to be
executed in parallel with node failures. With the original RF=2,
QUORUM is also 2, which causes writes to fail during node outage.

To address it, the third rack with a single node is added and the
replication factor is increased to 3.
2025-12-03 16:00:19 +01:00
Botond Dénes
b9199e8b24 Merge 'auth: use auth cache on login path' from Marcin Maliszkiewicz
Scylla currently has bad resiliency to connection storms. Nodes are easy to overload or impact their latency by unbound concurrency in making new connections on the client side. This can easily happen in bigger deployments where there are thousands of client instances, e.g. pods.

To improve resiliency we are introducing unified auth specialized cache to the system. This patch series is stage 1, where cache is used only on login path.

Dependency diagram:
```
|Authentication Layer|
            |
            v
+--------------------------------+
|          Auth Cache            |
+--------------------------------+
        ^                      |
        |                      |
        |                      v
|Raft Write Logic | | CQL Read Layer|
```

Cache invalidation is based on raft and the cache contains full content of related tables.

Ldap role manager may benefit partially as can_logic function is common  and will be cached,
but it still needs to query roles from external source.

Performance results:

For single shard connection/disconnection scenario insns/conn decreased by *5%*,
allocs/conn decreased by *23%*, tasks/conn decreased by *20%*. Results for 20 shards are very similar.

Raw data before:
```
≡ ◦ ⤖ rm -rf /tmp/scylla-data && build/release/scylla perf-cql-raw --workdir /tmp/scylla-data --smp 1 --developer-mode 1 --username cassandra --password cassandra --connection-per-request true 2> /dev/null
Running test with config: {workload=read, partitions=10000, concurrency=100, duration=5, ops_per_shard=0, auth, connection_per_request}
Pre-populated 10000 partitions
1128.55 tps (599.2 allocs/op,   0.0 logallocs/op, 145.2 tasks/op, 2586610 insns/op, 1350912 cycles/op,        0 errors)
1157.41 tps (601.3 allocs/op,   0.0 logallocs/op, 145.2 tasks/op, 2589046 insns/op, 1356691 cycles/op,        0 errors)
1167.42 tps (603.3 allocs/op,   0.0 logallocs/op, 145.2 tasks/op, 2603234 insns/op, 1360607 cycles/op,        0 errors)
1159.63 tps (605.9 allocs/op,   0.0 logallocs/op, 145.3 tasks/op, 2609977 insns/op, 1363935 cycles/op,        0 errors)
1165.12 tps (608.8 allocs/op,   0.0 logallocs/op, 145.2 tasks/op, 2625804 insns/op, 1365736 cycles/op,        0 errors)
throughput:
	mean=   1155.63 standard-deviation=15.66
	median= 1159.63 median-absolute-deviation=9.49
	maximum=1167.42 minimum=1128.55
instructions_per_op:
	mean=   2602934.31 standard-deviation=16063.01
	median= 2603234.19 median-absolute-deviation=13887.96
	maximum=2625804.05 minimum=2586609.82
cpu_cycles_per_op:
	mean=   1359576.30 standard-deviation=5945.69
	median= 1360607.05 median-absolute-deviation=4358.94
	maximum=1365736.42 minimum=1350912.10
```

Raw data after:
```
≡ ◦ ⤖ rm -rf /tmp/scylla-data && build/release/scylla perf-cql-raw --workdir /tmp/scylla-data --smp 1 --developer-mode 1 --username cassandra --password cassandra --connection-per-request true --duration 10 2> /dev/null
Running test with config: {workload=read, partitions=10000, concurrency=100, duration=10, ops_per_shard=0, auth, connection_per_request}
Pre-populated 10000 partitions
1132.09 tps (457.5 allocs/op,   0.0 logallocs/op, 115.1 tasks/op, 2432485 insns/op, 1270655 cycles/op,        0 errors)
1157.70 tps (458.4 allocs/op,   0.0 logallocs/op, 115.1 tasks/op, 2447779 insns/op, 1283768 cycles/op,        0 errors)
1162.86 tps (459.0 allocs/op,   0.0 logallocs/op, 115.1 tasks/op, 2463225 insns/op, 1291782 cycles/op,        0 errors)
1153.15 tps (460.2 allocs/op,   0.0 logallocs/op, 115.2 tasks/op, 2469230 insns/op, 1296381 cycles/op,        0 errors)
1142.09 tps (460.6 allocs/op,   0.0 logallocs/op, 115.1 tasks/op, 2478900 insns/op, 1299342 cycles/op,        0 errors)
1124.89 tps (462.5 allocs/op,   0.0 logallocs/op, 115.2 tasks/op, 2470962 insns/op, 1305026 cycles/op,        0 errors)
1156.75 tps (464.4 allocs/op,   0.0 logallocs/op, 115.1 tasks/op, 2493823 insns/op, 1305136 cycles/op,        0 errors)
1152.16 tps (466.3 allocs/op,   0.0 logallocs/op, 115.2 tasks/op, 2497246 insns/op, 1309816 cycles/op,        0 errors)
1154.77 tps (469.8 allocs/op,   0.0 logallocs/op, 115.5 tasks/op, 2571954 insns/op, 1345341 cycles/op,        0 errors)
1152.22 tps (472.4 allocs/op,   0.0 logallocs/op, 115.3 tasks/op, 2551954 insns/op, 1334202 cycles/op,        0 errors)
throughput:
	mean=   1148.87 standard-deviation=12.08
	median= 1153.15 median-absolute-deviation=7.88
	maximum=1162.86 minimum=1124.89
instructions_per_op:
	mean=   2487755.88 standard-deviation=43838.23
	median= 2478900.02 median-absolute-deviation=24531.06
	maximum=2571954.26 minimum=2432485.38
cpu_cycles_per_op:
	mean=   1304144.76 standard-deviation=22129.55
	median= 1305025.71 median-absolute-deviation=12363.25
	maximum=1345341.16 minimum=1270655.17
```

Fixes https://github.com/scylladb/scylladb/issues/18891
Backport: no, it's a new feature

Closes scylladb/scylladb#26841

* github.com:scylladb/scylladb:
  auth: use auth cache on login path
  auth: corutinize standard_role_manager::can_login
  main: auth: add auth cache dependency to auth service
  raft: update auth cache when data changes
  auth: storage_service: reload auth cache on v1 to v2 auth migration
  raft: reload auth cache on snapshot application
  service: add auth cache getter to storage service
  main: start auth cache service
  auth: add unified cache implementation
  auth: move table names to common.hh
2025-12-03 16:45:01 +02:00
Andrzej Jackowski
1ff7f5941b test: add test_rest_api_on_startup
This test verifies that REST API requests are handled properly
when a server is started or restarted. It is used to verify
the fix for scylladb/scylladb#27130, where a server failed with a
segmentation fault when `storage_service/raft_topology/reload` was
called too early.

Refs: scylladb/scylladb#27130
2025-12-03 15:35:59 +01:00
Andrzej Jackowski
3b70154f0a main: delay setup of storage_service REST API
The storage_service REST API uses `group0` internally. Before this
patch, it was possible to send an HTTP request before `group0` was
initialized, which resulted in a segmentation fault. Therefore,
this patch delays the setup of the storage_service REST API.

Fixes: scylladb/scylladb#27130
2025-12-03 15:35:54 +01:00
Pavel Emelyanov
6ae72ed134 test: Reuse S3 fixtures facilities in cqlpy/test_tools.py
Creating endpoint conf can be made with the s3_server method
Getting boto3 resource from s3_server itself is also possible

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#27380
2025-12-03 16:32:54 +02:00
Gleb Natapov
82f80478b8 direct_failure_detector: pass timeout to direct_fd_ping verb
Currently direct_fd_ping runs without timeout, but the verb is not
waited forever, the wait is canceled after a timeout, this timeout
simply is not passed to the rpc. It may create a situation where the
rpc callback can runs on a destination but it is no longer waited on.
Change the code to pass timeout to rpc as well and return earlier from
the rpc handler if the timeout is reached by the time the callback is
called. This is backwards compatible since timeout is passed as
optional.
2025-12-02 14:55:20 +02:00
Taras Veretilnyk
a191503ddf sstables: Extract file writer closing logic into separate methods
Refactor the consume_end_of_stream() method by extracting the inline
file writer closing logic into dedicated methods:
- close_index_writer()
- close_partitions_writer()
- close_rows_writer()
2025-12-02 13:07:41 +01:00
Taras Veretilnyk
619bf3ac4b sstables: Add components_digests to scylla metadata components
Add components_digests struct with optional digest fields for storing CRC32 digests of individual SSTable components in Scylla metadata.
Those includes:
- Data
- Compression
- Filter
- Statistics
- Summary
- Index
- TOC
- Partitions
- Rows
2025-12-02 12:36:34 +01:00
Taras Veretilnyk
62802b119b sstables: Implement CRC32 digest-only writer
Introduce template parameter to checksummed file writer to support
digest-only calculation without storing chunk checksums.
This will be needed for future to calculate digest of other components.
2025-11-27 22:40:07 +01:00
Wojciech Mitros
323e5cd171 mv: allow setting concurrency in PRUNE MATERIALIZED VIEW
The PRUNE MATERALIZED VIEW statement is performed as follows:
1. Perform a range scan of the view table from the view replicas based
on the ranges specified in the statement.
2. While reading the paged scan above, for each view row perform a read
from all base replicas at the corresponding primary key. If a discrepancy
is detected, delete the row in the view table.

When reading multiple rows, this is very slow because for each view row
we need to performe a single row query on multiple replicas.
In this patch we add an option to speed this up by performing many of the
single base row reads concurrently, at the concurrency specified in the
USING CONCURRENCY clause.

Fixes https://github.com/scylladb/scylladb/issues/27070
2025-11-27 00:02:28 +01:00
Marcin Maliszkiewicz
dd461e0472 auth: use auth cache on login path
This path may become hot during connection storms
that's why we want it to stress the node as little
as possible.
2025-11-26 12:01:33 +01:00
Marcin Maliszkiewicz
0c9b2e5332 auth: corutinize standard_role_manager::can_login
Corutinize so that it's easier to add new logic
in following commit.
2025-11-26 12:01:32 +01:00
Marcin Maliszkiewicz
b29c42adce main: auth: add auth cache dependency to auth service
In the following commit we'll switch some authorizer
and role manager code to use the cache so we're preparing
the dependency.
2025-11-26 12:01:31 +01:00
Marcin Maliszkiewicz
ea3dc0b0de raft: update auth cache when data changes
When applying group0_command we now inspect
whether any auth internal tables were modified,
and reload affected role entries in the cache.

Since one auth DML may change multiple tables,
when iterating over mutations we deduplicate
affected roles across those tables.
2025-11-26 12:00:50 +01:00
Marcin Maliszkiewicz
2a6bef96d6 auth: storage_service: reload auth cache on v1 to v2 auth migration 2025-11-26 12:00:50 +01:00
Marcin Maliszkiewicz
19da1cb656 raft: reload auth cache on snapshot application
Receiving snaphot is a rare event so as a simplification
we'll be reloading the whole cache instead of trying to merge
states, especially that expected size is small, below 100 records.

Reloading is non-disruptive operation, old entries are removed
only after all entries are loaded. If entry is updated, shared
pointer will be atomically replaced in a cache map.
2025-11-26 12:00:50 +01:00
Marcin Maliszkiewicz
2cf1ca43b5 service: add auth cache getter to storage service
Prepare for use in a subsequent commit in group0_state_machine,
where the auth cache will be integrated. This follows the same
pattern as updates to the service-level cache, view-building
state, and CDC streams.
2025-11-26 12:00:50 +01:00
Marcin Maliszkiewicz
642f468c59 main: start auth cache service
The service is not yet used anywhere,
we first build scaffolding.
2025-11-26 12:00:50 +01:00
Marcin Maliszkiewicz
bd7c87731b auth: add unified cache implementation
It combines data from all underlying auth tables.
Supports gentle full load and per role reloads.
Loading is done on shard 0 and then deep copies data
to all shards.
2025-11-26 12:00:50 +01:00
Marcin Maliszkiewicz
4c667e87ec auth: move table names to common.hh
They will be used additionally in cache code, added
in following commits.
2025-11-26 12:00:50 +01:00
Wojciech Mitros
aacf883a8b cql: add CONCURRENCY to the USING clause
Currently, the PRUNE MATERIALIZED VIEW statement performs all its
reads and writes in a single, continous sequence. This takes too
much time even for a moderate amount of 'PRUNED' data.
Instead, we want to make it possible to set a concurrency of the
reads and writes performed while processing the PRUNE statement,
so that if the user so desires, it may finish the PRUNING quicker
at the cost of adding more load on the cluster.
In this patch we add the CONCURRENCY setting to the USING clause
in cql. In the next patch, we'll be using it to actually set the
concurrency of PRUNE MATERIALIZED VIEW.
2025-11-21 12:32:52 +01:00
155 changed files with 1751 additions and 1794 deletions

1
.github/CODEOWNERS vendored
View File

@@ -57,7 +57,6 @@ repair/* @tgrabiec @asias
# SCHEMA MANAGEMENT
db/schema_tables* @tgrabiec
db/legacy_schema_migrator* @tgrabiec
service/migration* @tgrabiec
schema* @tgrabiec

View File

@@ -2223,12 +2223,12 @@ void validate_value(const rjson::value& v, const char* caller) {
// The put_or_delete_item class builds the mutations needed by the PutItem and
// DeleteItem operations - either as stand-alone commands or part of a list
// of commands in BatchWriteItems.
// of commands in BatchWriteItem.
// put_or_delete_item splits each operation into two stages: Constructing the
// object parses and validates the user input (throwing exceptions if there
// are input errors). Later, build() generates the actual mutation, with a
// specified timestamp. This split is needed because of the peculiar needs of
// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before
// BatchWriteItem and LWT. BatchWriteItem needs all parsing to happen before
// any writing happens (if one of the commands has an error, none of the
// writes should be done). LWT makes it impossible for the parse step to
// generate "mutation" objects, because the timestamp still isn't known.
@@ -2739,7 +2739,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
auto read_command = needs_read_before_write ?
previous_item_read_command(proxy, schema(), _ck, selection) :
nullptr;
return proxy.cas(schema(), std::move(*cas_shard), shared_from_this(), read_command, to_partition_ranges(*schema(), _pk),
return proxy.cas(schema(), std::move(*cas_shard), *this, read_command, to_partition_ranges(*schema(), _pk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM, timeout, timeout, true, std::move(cdc_opts)).then([this, read_command, &wcu_total] (bool is_applied) mutable {
if (!is_applied) {
@@ -3026,17 +3026,20 @@ struct primary_key_equal {
};
// This is a cas_request subclass for applying given put_or_delete_items to
// one partition using LWT as part as BatchWriteItems. This is a write-only
// one partition using LWT as part as BatchWriteItem. This is a write-only
// operation, not needing the previous value of the item (the mutation to be
// done is known prior to starting the operation). Nevertheless, we want to
// do this mutation via LWT to ensure that it is serialized with other LWT
// mutations to the same partition.
//
// The std::vector<put_or_delete_item> must remain alive until the
// storage_proxy::cas() future is resolved.
class put_or_delete_item_cas_request : public service::cas_request {
schema_ptr schema;
std::vector<put_or_delete_item> _mutation_builders;
const std::vector<put_or_delete_item>& _mutation_builders;
public:
put_or_delete_item_cas_request(schema_ptr s, std::vector<put_or_delete_item>&& b) :
schema(std::move(s)), _mutation_builders(std::move(b)) { }
put_or_delete_item_cas_request(schema_ptr s, const std::vector<put_or_delete_item>& b) :
schema(std::move(s)), _mutation_builders(b) { }
virtual ~put_or_delete_item_cas_request() = default;
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override {
std::optional<mutation> ret;
@@ -3052,20 +3055,21 @@ public:
}
};
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, dht::decorated_key dk, std::vector<put_or_delete_item>&& mutation_builders,
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk, const std::vector<put_or_delete_item>& mutation_builders,
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
auto timeout = executor::default_timeout();
auto op = seastar::make_shared<put_or_delete_item_cas_request>(schema, std::move(mutation_builders));
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
auto* op_ptr = op.get();
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility =
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
return proxy.cas(schema, std::move(cas_shard), op, nullptr, to_partition_ranges(dk),
return proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
timeout, timeout, true, std::move(cdc_opts)).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItems
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItem
// does not need to support conditional updates.
}
@@ -3131,30 +3135,34 @@ static future<> do_batch_write(service::storage_proxy& proxy,
// Multiple mutations may be destined for the same partition, adding
// or deleting different items of one partition. Join them together
// because we can do them in one cas() call.
std::unordered_map<schema_decorated_key, std::vector<put_or_delete_item>, schema_decorated_key_hash, schema_decorated_key_equal>
key_builders(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
using map_type = std::unordered_map<schema_decorated_key,
std::vector<put_or_delete_item>,
schema_decorated_key_hash,
schema_decorated_key_equal>;
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto& b : mutation_builders) {
auto dk = dht::decorate_key(*b.first, b.second.pk());
auto [it, added] = key_builders.try_emplace(schema_decorated_key{b.first, dk});
auto [it, added] = key_builders->try_emplace(schema_decorated_key{b.first, dk});
it->second.push_back(std::move(b.second));
}
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (auto& e) {
auto* key_builders_ptr = key_builders.get();
return parallel_for_each(*key_builders_ptr, [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (const auto& e) {
stats.write_using_lwt++;
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
if (desired_shard.this_shard()) {
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, std::move(e.second), client_state, trace_state, permit);
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, e.second, client_state, trace_state, permit);
} else {
stats.shard_bounce_for_lwt++;
return proxy.container().invoke_on(desired_shard.shard(), ssg,
[cs = client_state.move_to_other_shard(),
mb = e.second,
dk = e.first.dk,
&mb = e.second,
&dk = e.first.dk,
ks = e.first.schema->ks_name(),
cf = e.first.schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(service::storage_proxy& proxy) mutable {
return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf),
return do_with(cs.get(), [&proxy, &mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt)]
(service::client_state& client_state) mutable {
auto schema = proxy.data_dictionary().find_schema(ks, cf);
@@ -3168,11 +3176,11 @@ static future<> do_batch_write(service::storage_proxy& proxy,
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return cas_write(proxy, schema, std::move(cas_shard), dk, std::move(mb), client_state, std::move(trace_state), empty_service_permit());
return cas_write(proxy, schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
}).finally([desired_shard = std::move(desired_shard)]{});
}
});
}).finally([key_builders = std::move(key_builders)]{});
}
}

View File

@@ -349,9 +349,13 @@
"type":"long",
"description":"The shard the task is running on"
},
"creation_time":{
"type":"datetime",
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
},
"start_time":{
"type":"datetime",
"description":"The start time of the task; unspecified (equal to epoch) when state == created"
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
},
"end_time":{
"type":"datetime",
@@ -398,13 +402,17 @@
"type":"boolean",
"description":"Boolean flag indicating whether the task can be aborted"
},
"creation_time":{
"type":"datetime",
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
},
"start_time":{
"type":"datetime",
"description":"The start time of the task"
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
},
"end_time":{
"type":"datetime",
"description":"The end time of the task (unspecified when the task is not completed)"
"description":"The end time of the task (when execution completed); unspecified (equal to epoch) when the task is not completed"
},
"error":{
"type":"string",

View File

@@ -55,6 +55,7 @@ tm::task_status make_status(tasks::task_status status, sharded<gms::gossiper>& g
res.scope = status.scope;
res.state = status.state;
res.is_abortable = bool(status.is_abortable);
res.creation_time = get_time(status.creation_time);
res.start_time = get_time(status.start_time);
res.end_time = get_time(status.end_time);
res.error = status.error;
@@ -83,6 +84,7 @@ tm::task_stats make_stats(tasks::task_stats stats) {
res.table = stats.table;
res.entity = stats.entity;
res.shard = stats.shard;
res.creation_time = get_time(stats.creation_time);
res.start_time = get_time(stats.start_time);
res.end_time = get_time(stats.end_time);;
return res;

View File

@@ -9,6 +9,7 @@ target_sources(scylla_auth
allow_all_authorizer.cc
authenticated_user.cc
authenticator.cc
cache.cc
certificate_authenticator.cc
common.cc
default_authorizer.cc

View File

@@ -23,6 +23,7 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -12,6 +12,7 @@
#include "auth/authenticated_user.hh"
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "auth/common.hh"
#include "utils/alien_worker.hh"
@@ -29,7 +30,7 @@ extern const std::string_view allow_all_authenticator_name;
class allow_all_authenticator final : public authenticator {
public:
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&) {
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
}
virtual future<> start() override {

180
auth/cache.cc Normal file
View File

@@ -0,0 +1,180 @@
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "auth/cache.hh"
#include "auth/common.hh"
#include "auth/roles-metadata.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/consistency_level_type.hh"
#include "db/system_keyspace.hh"
#include "schema/schema.hh"
#include <iterator>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/format.hh>
namespace auth {
logging::logger logger("auth-cache");
cache::cache(cql3::query_processor& qp) noexcept
: _current_version(0)
, _qp(qp) {
}
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
auto it = _roles.find(role);
if (it == _roles.end()) {
return {};
}
return it->second;
}
future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& role) const {
auto rec = make_lw_shared<role_record>();
rec->version = _current_version;
auto fetch = [this, &role](const sstring& q) {
return _qp.execute_internal(q, db::consistency_level::LOCAL_ONE,
internal_distributed_query_state(), {role},
cql3::query_processor::cache_internal::yes);
};
// roles
{
static const sstring q = format("SELECT * FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, meta::roles_table::name);
auto rs = co_await fetch(q);
if (!rs->empty()) {
auto& r = rs->one();
rec->is_superuser = r.get_or<bool>("is_superuser", false);
rec->can_login = r.get_or<bool>("can_login", false);
rec->salted_hash = r.get_or<sstring>("salted_hash", "");
if (r.has("member_of")) {
auto mo = r.get_set<sstring>("member_of");
rec->member_of.insert(
std::make_move_iterator(mo.begin()),
std::make_move_iterator(mo.end()));
}
} else {
// role got deleted
co_return nullptr;
}
}
// members
{
static const sstring q = format("SELECT role, member FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_MEMBERS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->members.insert(r.get_as<sstring>("member"));
co_await coroutine::maybe_yield();
}
}
// attributes
{
static const sstring q = format("SELECT role, name, value FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_ATTRIBUTES_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->attributes[r.get_as<sstring>("name")] =
r.get_as<sstring>("value");
co_await coroutine::maybe_yield();
}
}
// permissions
{
static const sstring q = format("SELECT role, resource, permissions FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, PERMISSIONS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
auto resource = r.get_as<sstring>("resource");
auto perms_strings = r.get_set<sstring>("permissions");
std::unordered_set<sstring> perms_set(perms_strings.begin(), perms_strings.end());
auto pset = permissions::from_strings(perms_set);
rec->permissions[std::move(resource)] = std::move(pset);
co_await coroutine::maybe_yield();
}
}
co_return rec;
}
future<> cache::prune_all() noexcept {
for (auto it = _roles.begin(); it != _roles.end(); ) {
if (it->second->version != _current_version) {
_roles.erase(it++);
co_await coroutine::maybe_yield();
} else {
++it;
}
}
co_return;
}
future<> cache::load_all() {
if (legacy_mode(_qp)) {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
++_current_version;
logger.info("Loading all roles");
const uint32_t page_size = 128;
auto loader = [this](const cql3::untyped_result_set::row& r) -> future<stop_iteration> {
const auto name = r.get_as<sstring>("role");
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
}
co_return stop_iteration::no;
};
co_await _qp.query_internal(format("SELECT * FROM {}.{}",
db::system_keyspace::NAME, meta::roles_table::name),
db::consistency_level::LOCAL_ONE, {}, page_size, loader);
co_await prune_all();
for (const auto& [name, role] : _roles) {
co_await distribute_role(name, role);
}
co_await container().invoke_on_others([this](cache& c) -> future<> {
c._current_version = _current_version;
co_await c.prune_all();
});
}
future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
if (legacy_mode(_qp)) {
co_return;
}
for (const auto& name : roles) {
logger.info("Loading role {}", name);
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
} else {
_roles.erase(name);
}
co_await distribute_role(name, role);
}
}
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
auto role_ptr = role.get();
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
if (!role_ptr) {
c._roles.erase(name);
return;
}
auto role_copy = make_lw_shared<role_record>(*role_ptr);
c._roles[name] = std::move(role_copy);
});
}
bool cache::includes_table(const table_id& id) noexcept {
return id == db::system_keyspace::roles()->id()
|| id == db::system_keyspace::role_members()->id()
|| id == db::system_keyspace::role_attributes()->id()
|| id == db::system_keyspace::role_permissions()->id();
}
} // namespace auth

61
auth/cache.hh Normal file
View File

@@ -0,0 +1,61 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <unordered_set>
#include <unordered_map>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <absl/container/flat_hash_map.h>
#include "auth/permission.hh"
#include "auth/common.hh"
namespace cql3 { class query_processor; }
namespace auth {
class cache : public peering_sharded_service<cache> {
public:
using role_name_t = sstring;
using version_tag_t = char;
struct role_record {
bool can_login = false;
bool is_superuser = false;
std::unordered_set<role_name_t> member_of;
std::unordered_set<role_name_t> members;
sstring salted_hash;
std::unordered_map<sstring, sstring> attributes;
std::unordered_map<sstring, permission_set> permissions;
version_tag_t version; // used for seamless cache reloads
};
explicit cache(cql3::query_processor& qp) noexcept;
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
future<> load_all();
future<> load_roles(std::unordered_set<role_name_t> roles);
static bool includes_table(const table_id&) noexcept;
private:
using roles_map = absl::flat_hash_map<role_name_t, lw_shared_ptr<role_record>>;
roles_map _roles;
version_tag_t _current_version;
cql3::query_processor& _qp;
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
future<> prune_all() noexcept;
future<> distribute_role(const role_name_t& name, const lw_shared_ptr<role_record> role);
};
} // namespace auth

View File

@@ -8,6 +8,7 @@
*/
#include "auth/certificate_authenticator.hh"
#include "auth/cache.hh"
#include <boost/regex.hpp>
#include <fmt/ranges.h>
@@ -34,13 +35,14 @@ static const class_registrator<auth::authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, auth::cache&
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
: _queries([&] {
auto& conf = qp.db().get_config();
auto queries = conf.auth_certificate_role_queries();

View File

@@ -26,13 +26,15 @@ class raft_group0_client;
namespace auth {
class cache;
extern const std::string_view certificate_authenticator_name;
class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;
public:
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
~certificate_authenticator();
future<> start() override;

View File

@@ -48,6 +48,10 @@ extern constinit const std::string_view AUTH_PACKAGE_NAME;
} // namespace meta
constexpr std::string_view PERMISSIONS_CF = "role_permissions";
constexpr std::string_view ROLE_MEMBERS_CF = "role_members";
constexpr std::string_view ROLE_ATTRIBUTES_CF = "role_attributes";
// This is a helper to check whether auth-v2 is on.
bool legacy_mode(cql3::query_processor& qp);

View File

@@ -37,7 +37,6 @@ std::string_view default_authorizer::qualified_java_name() const {
static constexpr std::string_view ROLE_NAME = "role";
static constexpr std::string_view RESOURCE_NAME = "resource";
static constexpr std::string_view PERMISSIONS_NAME = "permissions";
static constexpr std::string_view PERMISSIONS_CF = "role_permissions";
static logging::logger alogger("default_authorizer");

View File

@@ -83,17 +83,18 @@ static const class_registrator<
ldap_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&> registration(ldap_role_manager_full_name);
::service::migration_manager&,
cache&> registration(ldap_role_manager_full_name);
ldap_role_manager::ldap_role_manager(
std::string_view query_template, std::string_view target_attr, std::string_view bind_name, std::string_view bind_password,
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
: _std_mgr(qp, rg0c, mm), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
: _std_mgr(qp, rg0c, mm, cache), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
, _bind_password(bind_password)
, _connection_factory(bind(std::mem_fn(&ldap_role_manager::reconnect), std::ref(*this))) {
}
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
: ldap_role_manager(
qp.db().get_config().ldap_url_template(),
qp.db().get_config().ldap_attr_role(),
@@ -101,7 +102,8 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
qp.db().get_config().ldap_bind_passwd(),
qp,
rg0c,
mm) {
mm,
cache) {
}
std::string_view ldap_role_manager::qualified_java_name() const noexcept {

View File

@@ -14,6 +14,7 @@
#include "ent/ldap/ldap_connection.hh"
#include "standard_role_manager.hh"
#include "auth/cache.hh"
namespace auth {
@@ -43,12 +44,13 @@ class ldap_role_manager : public role_manager {
std::string_view bind_password, ///< LDAP bind credentials.
cql3::query_processor& qp, ///< Passed to standard_role_manager.
::service::raft_group0_client& rg0c, ///< Passed to standard_role_manager.
::service::migration_manager& mm ///< Passed to standard_role_manager.
::service::migration_manager& mm, ///< Passed to standard_role_manager.
cache& cache ///< Passed to standard_role_manager.
);
/// Retrieves LDAP configuration entries from qp and invokes the other constructor. Required by
/// class_registrator<role_manager>.
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm);
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache);
/// Thrown when query-template parsing fails.
struct url_error : public std::runtime_error {

View File

@@ -11,6 +11,7 @@
#include <seastar/core/future.hh>
#include <stdexcept>
#include <string_view>
#include "auth/cache.hh"
#include "cql3/description.hh"
#include "utils/class_registrator.hh"
@@ -23,7 +24,8 @@ static const class_registrator<
maintenance_socket_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&> registration(sstring{maintenance_socket_role_manager_name});
::service::migration_manager&,
cache&> registration(sstring{maintenance_socket_role_manager_name});
std::string_view maintenance_socket_role_manager::qualified_java_name() const noexcept {

View File

@@ -8,6 +8,7 @@
#pragma once
#include "auth/cache.hh"
#include "auth/resource.hh"
#include "auth/role_manager.hh"
#include <seastar/core/future.hh>
@@ -29,7 +30,7 @@ extern const std::string_view maintenance_socket_role_manager_name;
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
class maintenance_socket_role_manager final : public role_manager {
public:
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {}
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {}
virtual std::string_view qualified_java_name() const noexcept override;

View File

@@ -49,6 +49,7 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
@@ -63,10 +64,11 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
password_authenticator::~password_authenticator() {
}
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
, _hashing_worker(hashing_worker)
@@ -315,11 +317,20 @@ future<authenticated_user> password_authenticator::authenticate(
const sstring password = credentials.at(PASSWORD_KEY);
try {
const std::optional<sstring> salted_hash = co_await get_password_hash(username);
if (!salted_hash) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
std::optional<sstring> salted_hash;
if (legacy_mode(_qp)) {
salted_hash = co_await get_password_hash(username);
if (!salted_hash) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
} else {
auto role = _cache.get(username);
if (!role || role->salted_hash.empty()) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
salted_hash = role->salted_hash;
}
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash = std::move(salted_hash)]{
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
return passwords::check(password, *salted_hash);
});
if (!password_match) {

View File

@@ -16,6 +16,7 @@
#include "db/consistency_level_type.hh"
#include "auth/authenticator.hh"
#include "auth/passwords.hh"
#include "auth/cache.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
@@ -41,6 +42,7 @@ class password_authenticator : public authenticator {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
future<> _stopped;
abort_source _as;
std::string _superuser; // default superuser name from the config (may or may not be present in roles table)
@@ -53,7 +55,7 @@ public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(const db::config&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
~password_authenticator();

View File

@@ -35,9 +35,10 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
: _socket_path(qp.db().get_config().saslauthd_socket_path())
{}

View File

@@ -11,6 +11,7 @@
#pragma once
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
@@ -29,7 +30,7 @@ namespace auth {
class saslauthd_authenticator : public authenticator {
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
public:
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
future<> start() override;

View File

@@ -17,6 +17,7 @@
#include <chrono>
#include <seastar/core/future-util.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
@@ -157,6 +158,7 @@ static future<> validate_role_exists(const service& ser, std::string_view role_n
service::service(
utils::loading_cache_config c,
cache& cache,
cql3::query_processor& qp,
::service::raft_group0_client& g0,
::service::migration_notifier& mn,
@@ -166,6 +168,7 @@ service::service(
maintenance_socket_enabled used_by_maintenance_socket)
: _loading_cache_config(std::move(c))
, _permissions_cache(nullptr)
, _cache(cache)
, _qp(qp)
, _group0_client(g0)
, _mnotifier(mn)
@@ -188,15 +191,17 @@ service::service(
::service::migration_manager& mm,
const service_config& sc,
maintenance_socket_enabled used_by_maintenance_socket,
cache& cache,
utils::alien_worker& hashing_worker)
: service(
std::move(c),
cache,
qp,
g0,
mn,
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
used_by_maintenance_socket) {
}
@@ -232,6 +237,9 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
auto auth_version = co_await sys_ks.get_auth_version();
// version is set in query processor to be easily available in various places we call auth::legacy_mode check.
_qp.auth_version = auth_version;
if (this_shard_id() == 0) {
co_await _cache.load_all();
}
if (!_used_by_maintenance_socket) {
// this legacy keyspace is only used by cqlsh
// it's needed when executing `list roles` or `list users`

View File

@@ -21,6 +21,7 @@
#include "auth/authorizer.hh"
#include "auth/permission.hh"
#include "auth/permissions_cache.hh"
#include "auth/cache.hh"
#include "auth/role_manager.hh"
#include "auth/common.hh"
#include "cql3/description.hh"
@@ -77,6 +78,7 @@ public:
class service final : public seastar::peering_sharded_service<service> {
utils::loading_cache_config _loading_cache_config;
std::unique_ptr<permissions_cache> _permissions_cache;
cache& _cache;
cql3::query_processor& _qp;
@@ -107,6 +109,7 @@ class service final : public seastar::peering_sharded_service<service> {
public:
service(
utils::loading_cache_config,
cache& cache,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_notifier&,
@@ -128,6 +131,7 @@ public:
::service::migration_manager&,
const service_config&,
maintenance_socket_enabled,
cache&,
utils::alien_worker&);
future<> start(::service::migration_manager&, db::system_keyspace&);

View File

@@ -41,21 +41,6 @@
namespace auth {
namespace meta {
namespace role_members_table {
constexpr std::string_view name{"role_members" , 12};
}
namespace role_attributes_table {
constexpr std::string_view name{"role_attributes", 15};
}
}
static logging::logger log("standard_role_manager");
@@ -64,7 +49,8 @@ static const class_registrator<
standard_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&> registration("org.apache.cassandra.auth.CassandraRoleManager");
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
struct record final {
sstring name;
@@ -121,10 +107,11 @@ static bool has_can_login(const cql3::untyped_result_set_row& row) {
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob_unfragmented("can_login")).is_null());
}
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
{}
@@ -136,7 +123,7 @@ std::string_view standard_role_manager::qualified_java_name() const noexcept {
const resource_set& standard_role_manager::protected_resources() const {
static const resource_set resources({
make_data_resource(meta::legacy::AUTH_KS, meta::roles_table::name),
make_data_resource(meta::legacy::AUTH_KS, meta::role_members_table::name)});
make_data_resource(meta::legacy::AUTH_KS, ROLE_MEMBERS_CF)});
return resources;
}
@@ -160,7 +147,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
" PRIMARY KEY (role, member)"
")",
meta::legacy::AUTH_KS,
meta::role_members_table::name);
ROLE_MEMBERS_CF);
static const sstring create_role_attributes_query = seastar::format(
"CREATE TABLE {}.{} ("
" role text,"
@@ -169,7 +156,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
" PRIMARY KEY(role, name)"
")",
meta::legacy::AUTH_KS,
meta::role_attributes_table::name);
ROLE_ATTRIBUTES_CF);
return when_all_succeed(
create_legacy_metadata_table_if_missing(
meta::roles_table::name,
@@ -177,12 +164,12 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
create_roles_query,
_migration_manager),
create_legacy_metadata_table_if_missing(
meta::role_members_table::name,
ROLE_MEMBERS_CF,
_qp,
create_role_members_query,
_migration_manager),
create_legacy_metadata_table_if_missing(
meta::role_attributes_table::name,
ROLE_ATTRIBUTES_CF,
_qp,
create_role_attributes_query,
_migration_manager)).discard_result();
@@ -429,7 +416,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
const auto revoke_from_members = [this, role_name, &mc] () -> future<> {
const sstring query = seastar::format("SELECT member FROM {}.{} WHERE role = ?",
get_auth_ks_name(_qp),
meta::role_members_table::name);
ROLE_MEMBERS_CF);
const auto members = co_await _qp.execute_internal(
query,
consistency_for_role(role_name),
@@ -461,7 +448,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
const auto remove_attributes_of = [this, role_name, &mc] () -> future<> {
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ?",
get_auth_ks_name(_qp),
meta::role_attributes_table::name);
ROLE_ATTRIBUTES_CF);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name)},
cql3::query_processor::cache_internal::yes).discard_result();
@@ -517,7 +504,7 @@ standard_role_manager::legacy_modify_membership(
case membership_change::add: {
const sstring insert_query = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
get_auth_ks_name(_qp),
meta::role_members_table::name);
ROLE_MEMBERS_CF);
co_return co_await _qp.execute_internal(
insert_query,
consistency_for_role(role_name),
@@ -529,7 +516,7 @@ standard_role_manager::legacy_modify_membership(
case membership_change::remove: {
const sstring delete_query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
get_auth_ks_name(_qp),
meta::role_members_table::name);
ROLE_MEMBERS_CF);
co_return co_await _qp.execute_internal(
delete_query,
consistency_for_role(role_name),
@@ -567,12 +554,12 @@ standard_role_manager::modify_membership(
case membership_change::add:
modify_role_members = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
get_auth_ks_name(_qp),
meta::role_members_table::name);
ROLE_MEMBERS_CF);
break;
case membership_change::remove:
modify_role_members = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
get_auth_ks_name(_qp),
meta::role_members_table::name);
ROLE_MEMBERS_CF);
break;
default:
on_internal_error(log, format("unknown membership_change value: {}", int(ch)));
@@ -666,7 +653,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
const sstring query = seastar::format("SELECT * FROM {}.{}",
get_auth_ks_name(_qp),
meta::role_members_table::name);
ROLE_MEMBERS_CF);
const auto results = co_await _qp.execute_internal(
query,
@@ -731,15 +718,21 @@ future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
}
future<bool> standard_role_manager::can_login(std::string_view role_name) {
return require_record(_qp, role_name).then([](record r) {
return r.can_login;
});
if (legacy_mode(_qp)) {
const auto r = co_await require_record(_qp, role_name);
co_return r.can_login;
}
auto role = _cache.get(sstring(role_name));
if (!role) {
throw nonexistant_role(role_name);
}
co_return role->can_login;
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
meta::role_attributes_table::name);
ROLE_ATTRIBUTES_CF);
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
if (!result_set->empty()) {
const cql3::untyped_result_set_row &row = result_set->one();
@@ -770,7 +763,7 @@ future<> standard_role_manager::set_attribute(std::string_view role_name, std::s
}
const sstring query = seastar::format("INSERT INTO {}.{} (role, name, value) VALUES (?, ?, ?)",
get_auth_ks_name(_qp),
meta::role_attributes_table::name);
ROLE_ATTRIBUTES_CF);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}, cql3::query_processor::cache_internal::yes).discard_result();
} else {
@@ -785,7 +778,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
}
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
meta::role_attributes_table::name);
ROLE_ATTRIBUTES_CF);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes).discard_result();
} else {

View File

@@ -10,6 +10,7 @@
#include "auth/common.hh"
#include "auth/role_manager.hh"
#include "auth/cache.hh"
#include <string_view>
@@ -36,13 +37,14 @@ class standard_role_manager final : public role_manager {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
future<> _stopped;
abort_source _as;
std::string _superuser;
shared_promise<> _superuser_created_promise;
public:
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
virtual std::string_view qualified_java_name() const noexcept override;

View File

@@ -13,6 +13,7 @@
#include "auth/authorizer.hh"
#include "auth/default_authorizer.hh"
#include "auth/password_authenticator.hh"
#include "auth/cache.hh"
#include "auth/permission.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
@@ -37,8 +38,8 @@ class transitional_authenticator : public authenticator {
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, hashing_worker)) {
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
}
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
@@ -240,6 +241,7 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
auth::cache&,
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<

View File

@@ -445,6 +445,7 @@ ldap_tests = set([
scylla_tests = set([
'test/boost/combined_tests',
'test/boost/UUID_test',
'test/boost/url_parse_test',
'test/boost/advanced_rpc_compressor_test',
'test/boost/allocation_strategy_test',
'test/boost/alternator_unit_test',
@@ -1061,7 +1062,6 @@ scylla_core = (['message/messaging_service.cc',
'db/hints/resource_manager.cc',
'db/hints/sync_point.cc',
'db/large_data_handler.cc',
'db/legacy_schema_migrator.cc',
'db/marshal/type_parser.cc',
'db/per_partition_rate_limit_options.cc',
'db/rate_limiter.cc',
@@ -1195,6 +1195,7 @@ scylla_core = (['message/messaging_service.cc',
'auth/allow_all_authorizer.cc',
'auth/authenticated_user.cc',
'auth/authenticator.cc',
'auth/cache.cc',
'auth/common.cc',
'auth/default_authorizer.cc',
'auth/resource.cc',
@@ -1646,6 +1647,7 @@ deps['test/boost/bytes_ostream_test'] = [
]
deps['test/boost/input_stream_test'] = ['test/boost/input_stream_test.cc']
deps['test/boost/UUID_test'] = ['clocks-impl.cc', 'utils/UUID_gen.cc', 'test/boost/UUID_test.cc', 'utils/uuid.cc', 'utils/dynamic_bitset.cc', 'utils/hashers.cc', 'utils/on_internal_error.cc']
deps['test/boost/url_parse_test'] = ['utils/http.cc', 'test/boost/url_parse_test.cc', ]
deps['test/boost/murmur_hash_test'] = ['bytes.cc', 'utils/murmur_hash.cc', 'test/boost/murmur_hash_test.cc']
deps['test/boost/allocation_strategy_test'] = ['test/boost/allocation_strategy_test.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc', 'utils/labels.cc']
deps['test/boost/log_heap_test'] = ['test/boost/log_heap_test.cc']

View File

@@ -575,6 +575,15 @@ usingTimeoutServiceLevelClauseObjective[std::unique_ptr<cql3::attributes::raw>&
| serviceLevel sl_name=serviceLevelOrRoleName { attrs->service_level = std::move(sl_name); }
;
usingTimeoutConcurrencyClause[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_USING usingTimeoutConcurrencyClauseObjective[attrs] ( K_AND usingTimeoutConcurrencyClauseObjective[attrs] )*
;
usingTimeoutConcurrencyClauseObjective[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_TIMEOUT to=term { attrs->timeout = std::move(to); }
| K_CONCURRENCY c=term { attrs->concurrency = std::move(c); }
;
/**
* UPDATE <CF>
* USING TIMESTAMP <long>
@@ -666,7 +675,7 @@ pruneMaterializedViewStatement returns [std::unique_ptr<raw::select_statement> e
auto attrs = std::make_unique<cql3::attributes::raw>();
expression wclause = conjunction{};
}
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingClause[attrs] )?
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingTimeoutConcurrencyClause[attrs] )?
{
auto params = make_lw_shared<raw::select_statement::parameters>(std::move(orderings), is_distinct, allow_filtering, statement_subtype, bypass_cache);
return std::make_unique<raw::select_statement>(std::move(cf), std::move(params),
@@ -2370,6 +2379,7 @@ K_LIKE: L I K E;
K_TIMEOUT: T I M E O U T;
K_PRUNE: P R U N E;
K_CONCURRENCY: C O N C U R R E N C Y;
K_EXECUTE: E X E C U T E;

View File

@@ -20,19 +20,21 @@
namespace cql3 {
std::unique_ptr<attributes> attributes::none() {
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}}};
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}, {}}};
}
attributes::attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level)
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency)
: _timestamp_unset_guard(timestamp)
, _timestamp{std::move(timestamp)}
, _time_to_live_unset_guard(time_to_live)
, _time_to_live{std::move(time_to_live)}
, _timeout{std::move(timeout)}
, _service_level(std::move(service_level))
, _concurrency{std::move(concurrency)}
{ }
bool attributes::is_timestamp_set() const {
@@ -51,6 +53,10 @@ bool attributes::is_service_level_set() const {
return bool(_service_level);
}
bool attributes::is_concurrency_set() const {
return bool(_concurrency);
}
int64_t attributes::get_timestamp(int64_t now, const query_options& options) {
if (!_timestamp.has_value() || _timestamp_unset_guard.is_unset(options)) {
return now;
@@ -123,6 +129,27 @@ qos::service_level_options attributes::get_service_level(qos::service_level_cont
return sl_controller.get_service_level(sl_name).slo;
}
std::optional<int32_t> attributes::get_concurrency(const query_options& options) const {
if (!_concurrency.has_value()) {
return std::nullopt;
}
cql3::raw_value concurrency_raw = expr::evaluate(*_concurrency, options);
if (concurrency_raw.is_null()) {
throw exceptions::invalid_request_exception("Invalid null value of concurrency");
}
int32_t concurrency;
try {
concurrency = concurrency_raw.view().validate_and_deserialize<int32_t>(*int32_type);
} catch (marshal_exception& e) {
throw exceptions::invalid_request_exception("Invalid concurrency value");
}
if (concurrency <= 0) {
throw exceptions::invalid_request_exception("Concurrency must be a positive integer");
}
return concurrency;
}
void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timestamp.has_value()) {
expr::fill_prepare_context(*_timestamp, ctx);
@@ -133,10 +160,13 @@ void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timeout.has_value()) {
expr::fill_prepare_context(*_timeout, ctx);
}
if (_concurrency.has_value()) {
expr::fill_prepare_context(*_concurrency, ctx);
}
}
std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const {
std::optional<expr::expression> ts, ttl, to;
std::optional<expr::expression> ts, ttl, to, conc;
if (timestamp.has_value()) {
ts = prepare_expression(*timestamp, db, ks_name, nullptr, timestamp_receiver(ks_name, cf_name));
@@ -153,7 +183,12 @@ std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database d
verify_no_aggregate_functions(*timeout, "USING clause");
}
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level)}};
if (concurrency.has_value()) {
conc = prepare_expression(*concurrency, db, ks_name, nullptr, concurrency_receiver(ks_name, cf_name));
verify_no_aggregate_functions(*concurrency, "USING clause");
}
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level), std::move(conc)}};
}
lw_shared_ptr<column_specification> attributes::raw::timestamp_receiver(const sstring& ks_name, const sstring& cf_name) const {
@@ -168,4 +203,8 @@ lw_shared_ptr<column_specification> attributes::raw::timeout_receiver(const sstr
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[timeout]", true), duration_type);
}
lw_shared_ptr<column_specification> attributes::raw::concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const {
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[concurrency]", true), data_type_for<int32_t>());
}
}

View File

@@ -36,13 +36,15 @@ private:
std::optional<cql3::expr::expression> _time_to_live;
std::optional<cql3::expr::expression> _timeout;
std::optional<sstring> _service_level;
std::optional<cql3::expr::expression> _concurrency;
public:
static std::unique_ptr<attributes> none();
private:
attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level);
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency);
public:
bool is_timestamp_set() const;
@@ -52,6 +54,8 @@ public:
bool is_service_level_set() const;
bool is_concurrency_set() const;
int64_t get_timestamp(int64_t now, const query_options& options);
std::optional<int32_t> get_time_to_live(const query_options& options);
@@ -60,6 +64,8 @@ public:
qos::service_level_options get_service_level(qos::service_level_controller& sl_controller) const;
std::optional<int32_t> get_concurrency(const query_options& options) const;
void fill_prepare_context(prepare_context& ctx);
class raw final {
@@ -68,6 +74,7 @@ public:
std::optional<cql3::expr::expression> time_to_live;
std::optional<cql3::expr::expression> timeout;
std::optional<sstring> service_level;
std::optional<cql3::expr::expression> concurrency;
std::unique_ptr<attributes> prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const;
private:
@@ -76,6 +83,8 @@ public:
lw_shared_ptr<column_specification> time_to_live_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> timeout_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const;
};
};

View File

@@ -165,8 +165,7 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
service::topology_mutation_builder builder(ts);
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
rtbuilder.set("done", false)
.set("start_time", db_clock::now());
rtbuilder.set("done", false);
if (!qp.proxy().features().topology_global_request_queue) {
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_global_topology_request_id(global_request_id);

View File

@@ -331,7 +331,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
if (!cl_for_paxos) [[unlikely]] {
return make_exception_future<shared_ptr<cql_transport::messages::result_message>>(std::move(cl_for_paxos).assume_error());
}
seastar::shared_ptr<cas_request> request;
std::unique_ptr<cas_request> request;
schema_ptr schema;
db::timeout_clock::time_point now = db::timeout_clock::now();
@@ -354,9 +354,9 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
if (keys.empty()) {
continue;
}
if (request.get() == nullptr) {
if (!request) {
schema = statement.s;
request = seastar::make_shared<cas_request>(schema, std::move(keys));
request = std::make_unique<cas_request>(schema, std::move(keys));
} else if (keys.size() != 1 || keys.front().equal(request->key().front(), dht::ring_position_comparator(*schema)) == false) {
throw exceptions::invalid_request_exception("BATCH with conditions cannot span multiple partitions");
}
@@ -366,7 +366,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
request->add_row_update(statement, std::move(ranges), std::move(json_cache), statement_options);
}
if (request.get() == nullptr) {
if (!request) {
throw exceptions::invalid_request_exception(format("Unrestricted partition key in a conditional BATCH"));
}
@@ -377,9 +377,10 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
);
}
return qp.proxy().cas(schema, std::move(cas_shard), request, request->read_command(qp), request->key(),
auto* request_ptr = request.get();
return qp.proxy().cas(schema, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request] (bool is_applied) {
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request = std::move(request)] (bool is_applied) {
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
});
}

View File

@@ -279,11 +279,15 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
}
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
// Regular secondary indexes require rf-rack-validity.
// Custom indexes need to validate this property themselves, if they need it.
if (!_properties || !_properties->custom_class) {
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
}
validate_for_local_index(*schema);

View File

@@ -401,7 +401,8 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
type.is_update() ? "update" : "deletion"));
}
auto request = seastar::make_shared<cas_request>(s, std::move(keys));
auto request = std::make_unique<cas_request>(s, std::move(keys));
auto* request_ptr = request.get();
// cas_request can be used for batches as well single statements; Here we have just a single
// modification in the list of CAS commands, since we're handling single-statement execution.
request->add_row_update(*this, std::move(ranges), std::move(json_cache), options);
@@ -427,9 +428,9 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
tablet_info = erm->check_locality(token);
}
return qp.proxy().cas(s, std::move(cas_shard), request, request->read_command(qp), request->key(),
return qp.proxy().cas(s, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request, tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request = std::move(request), tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
result->add_tablet_info(tablet_replicas, token_range);
return result;

View File

@@ -21,7 +21,7 @@ namespace cql3 {
namespace statements {
static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges, std::vector<query::clustering_range> clustering_bounds, view_ptr view,
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration) {
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration, size_t concurrency) {
auto key_columns = std::ranges::to<std::vector<const column_definition*>>(
view->all_columns()
| std::views::filter([] (const column_definition& cdef) { return cdef.is_primary_key(); })
@@ -35,7 +35,7 @@ static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges,
tracing::trace(state.get_trace_state(), "Deleting ghost rows from partition ranges {}", partition_ranges);
auto p = service::pager::query_pagers::ghost_row_deleting_pager(schema_ptr(view), selection, state,
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration);
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration, concurrency);
int32_t page_size = std::max(options.get_page_size(), 1000);
auto now = gc_clock::now();
@@ -62,7 +62,8 @@ future<::shared_ptr<cql_transport::messages::result_message>> prune_materialized
auto timeout_duration = get_timeout(state.get_client_state(), options);
dht::partition_range_vector key_ranges = _restrictions->get_partition_key_ranges(options);
std::vector<query::clustering_range> clustering_bounds = _restrictions->get_clustering_bounds(options);
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration).then([] {
size_t concurrency = _attrs->is_concurrency_set() ? _attrs->get_concurrency(options).value() : 1;
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration, concurrency).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(::make_shared<cql_transport::messages::result_message::void_message>());
});
}

View File

@@ -10,7 +10,6 @@ target_sources(db
schema_applier.cc
schema_tables.cc
cql_type_parser.cc
legacy_schema_migrator.cc
commitlog/commitlog.cc
commitlog/commitlog_replayer.cc
commitlog/commitlog_entry.cc

View File

@@ -1172,6 +1172,17 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"* default_weight: (Default: 1 **) How many requests are handled during each turn of the RoundRobin.\n"
"* weights: (Default: Keyspace: 1) Takes a list of keyspaces. It sets how many requests are handled during each turn of the RoundRobin, based on the request_scheduler_id.")
/**
* @Group Vector search settings
* @GroupDescription Settings for configuring and tuning vector search functionality.
*/
, vector_store_primary_uri(this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in `vector_store_primary_uri` and `vector_store_secondary_uri`. The available options are:\n"
"* truststore: (Default: <not set, use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
/**
* @Group Security properties
* @GroupDescription Server and client security settings.
*/
@@ -1459,13 +1470,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
, vector_store_primary_uri(
this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "", "A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in vector_store_primary_uri and vector_store_secondary_uri. The available options are:\n"
"* truststore: (Default: <not set. use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")

View File

@@ -344,6 +344,9 @@ public:
named_value<sstring> request_scheduler;
named_value<sstring> request_scheduler_id;
named_value<string_map> request_scheduler_options;
named_value<sstring> vector_store_primary_uri;
named_value<sstring> vector_store_secondary_uri;
named_value<string_map> vector_store_encryption_options;
named_value<sstring> authenticator;
named_value<sstring> internode_authenticator;
named_value<sstring> authorizer;
@@ -471,10 +474,6 @@ public:
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
named_value<sstring> vector_store_primary_uri;
named_value<sstring> vector_store_secondary_uri;
named_value<string_map> vector_store_encryption_options;
named_value<bool> abort_on_ebadf;
named_value<bool> sanitizer_report_backtrace;

View File

@@ -1,602 +0,0 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
// Since Scylla 2.0, we use system tables whose schemas were introduced in
// Cassandra 3. If Scylla boots to find a data directory with system tables
// with older schemas - produced by pre-2.0 Scylla or by pre-3.0 Cassandra,
// we need to migrate these old tables to the new format.
//
// We provide here a function, db::legacy_schema_migrator::migrate(),
// for a one-time migration from old to new system tables. The function
// reads old system tables, write them back in the new format, and finally
// delete the old system tables. Scylla's main should call this function and
// wait for the returned future, before starting to serve the database.
#include <boost/iterator/filter_iterator.hpp>
#include <seastar/core/future-util.hh>
#include <seastar/util/log.hh>
#include <map>
#include <unordered_set>
#include <chrono>
#include "replica/database.hh"
#include "legacy_schema_migrator.hh"
#include "system_keyspace.hh"
#include "schema_tables.hh"
#include "schema/schema_builder.hh"
#include "service/storage_proxy.hh"
#include "utils/rjson.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/util.hh"
#include "cql3/statements/property_definitions.hh"
static seastar::logger mlogger("legacy_schema_migrator");
namespace db {
namespace legacy_schema_migrator {
// local data carriers
class migrator {
public:
static const std::unordered_set<sstring> legacy_schema_tables;
migrator(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp)
: _sp(sp), _db(db), _sys_ks(sys_ks), _qp(qp) {
}
migrator(migrator&&) = default;
typedef db_clock::time_point time_point;
// TODO: we don't support triggers.
// this is a placeholder.
struct trigger {
time_point timestamp;
sstring name;
std::unordered_map<sstring, sstring> options;
};
struct table {
time_point timestamp;
schema_ptr metadata;
std::vector<trigger> triggers;
};
struct type {
time_point timestamp;
user_type metadata;
};
struct function {
time_point timestamp;
sstring ks_name;
sstring fn_name;
std::vector<sstring> arg_names;
std::vector<sstring> arg_types;
sstring return_type;
bool called_on_null_input;
sstring language;
sstring body;
};
struct aggregate {
time_point timestamp;
sstring ks_name;
sstring fn_name;
std::vector<sstring> arg_names;
std::vector<sstring> arg_types;
sstring return_type;
sstring final_func;
sstring initcond;
sstring state_func;
sstring state_type;
};
struct keyspace {
time_point timestamp;
sstring name;
bool durable_writes;
std::map<sstring, sstring> replication_params;
std::vector<table> tables;
std::vector<type> types;
std::vector<function> functions;
std::vector<aggregate> aggregates;
};
class unsupported_feature : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
static sstring fmt_query(const char* fmt, const char* table) {
return fmt::format(fmt::runtime(fmt), db::system_keyspace::NAME, table);
}
typedef ::shared_ptr<cql3::untyped_result_set> result_set_type;
typedef const cql3::untyped_result_set::row row_type;
future<> read_table(keyspace& dst, sstring cf_name, time_point timestamp) {
auto fmt = "SELECT * FROM {}.{} WHERE keyspace_name = ? AND columnfamily_name = ?";
auto tq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNFAMILIES);
auto cq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNS);
auto zq = fmt_query(fmt, db::system_keyspace::legacy::TRIGGERS);
typedef std::tuple<future<result_set_type>, future<result_set_type>, future<result_set_type>, future<db::schema_tables::legacy::schema_mutations>> result_tuple;
return when_all(_qp.execute_internal(tq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
_qp.execute_internal(cq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
_qp.execute_internal(zq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
db::schema_tables::legacy::read_table_mutations(_sp, dst.name, cf_name, db::system_keyspace::legacy::column_families()))
.then([&dst, cf_name, timestamp](result_tuple&& t) {
result_set_type tables = std::get<0>(t).get();
result_set_type columns = std::get<1>(t).get();
result_set_type triggers = std::get<2>(t).get();
db::schema_tables::legacy::schema_mutations sm = std::get<3>(t).get();
row_type& td = tables->one();
auto ks_name = td.get_as<sstring>("keyspace_name");
auto cf_name = td.get_as<sstring>("columnfamily_name");
auto id = table_id(td.get_or("cf_id", generate_legacy_id(ks_name, cf_name).uuid()));
schema_builder builder(dst.name, cf_name, id);
builder.with_version(sm.digest());
cf_type cf = sstring_to_cf_type(td.get_or("type", sstring("standard")));
if (cf == cf_type::super) {
fail(unimplemented::cause::SUPER);
}
auto comparator = td.get_as<sstring>("comparator");
bool is_compound = cell_comparator::check_compound(comparator);
builder.set_is_compound(is_compound);
cell_comparator::read_collections(builder, comparator);
bool filter_sparse = false;
data_type default_validator = {};
if (td.has("default_validator")) {
default_validator = db::schema_tables::parse_type(td.get_as<sstring>("default_validator"));
if (default_validator->is_counter()) {
builder.set_is_counter(true);
}
builder.set_default_validation_class(default_validator);
}
/*
* Determine whether or not the table is *really* dense
* We cannot trust is_dense value of true (see CASSANDRA-11502, that fixed the issue for 2.2 only, and not retroactively),
* but we can trust is_dense value of false.
*/
auto is_dense = td.get_opt<bool>("is_dense");
if (!is_dense || *is_dense) {
is_dense = [&] {
/*
* As said above, this method is only here because we need to deal with thrift upgrades.
* Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
* then we'll have saved the "is_dense" value and will be good to go.
*
* But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need
* to infer that information without relying on it in that case. And for the most part this is
* easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
* having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
* PRIMARY KEY defined.
*
* So we need to recognize those special case CQL3 table with only a primary key. If we have some
* clustering columns, we're fine as said above. So the only problem is that we cannot decide for
* sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
* has been created in CQL3 by say:
* CREATE TABLE test (k int PRIMARY KEY)
* in which case it should not be dense. However, we can limit our margin of error by assuming we are
* in the latter case only if the comparator is exactly CompositeType(UTF8Type).
*/
std::optional<column_id> max_cl_idx;
const cql3::untyped_result_set::row * regular = nullptr;
for (auto& row : *columns) {
auto kind_str = row.get_as<sstring>("type");
if (kind_str == "compact_value") {
continue;
}
auto kind = db::schema_tables::deserialize_kind(kind_str);
if (kind == column_kind::regular_column) {
if (regular != nullptr) {
return false;
}
regular = &row;
continue;
}
if (kind == column_kind::clustering_key) {
max_cl_idx = std::max(column_id(row.get_or("component_index", 0)), max_cl_idx.value_or(column_id()));
}
}
auto is_cql3_only_pk_comparator = [](const sstring& comparator) {
if (!cell_comparator::check_compound(comparator)) {
return false;
}
// CMH. We don't have composites, nor a parser for it. This is a simple way of c
// checking the same.
auto comma = comparator.find(',');
if (comma != sstring::npos) {
return false;
}
auto off = comparator.find('(');
auto end = comparator.find(')');
return comparator.compare(off, end - off, utf8_type->name()) == 0;
};
if (max_cl_idx) {
auto n = std::count(comparator.begin(), comparator.end(), ','); // num comp - 1
return *max_cl_idx == n;
}
if (regular) {
return false;
}
return !is_cql3_only_pk_comparator(comparator);
}();
// now, if switched to sparse, remove redundant compact_value column and the last clustering column,
// directly copying CASSANDRA-11502 logic. See CASSANDRA-11315.
filter_sparse = !*is_dense;
}
builder.set_is_dense(*is_dense);
auto is_cql = !*is_dense && is_compound;
auto is_static_compact = !*is_dense && !is_compound;
// org.apache.cassandra.schema.LegacySchemaMigrator#isEmptyCompactValueColumn
auto is_empty_compact_value = [](const cql3::untyped_result_set::row& column_row) {
auto kind_str = column_row.get_as<sstring>("type");
// Cassandra only checks for "compact_value", but Scylla generates "regular" instead (#2586)
return (kind_str == "compact_value" || kind_str == "regular")
&& column_row.get_as<sstring>("column_name").empty();
};
for (auto& row : *columns) {
auto kind_str = row.get_as<sstring>("type");
auto kind = db::schema_tables::deserialize_kind(kind_str);
auto component_index = kind > column_kind::clustering_key ? 0 : column_id(row.get_or("component_index", 0));
auto name = row.get_or<sstring>("column_name", sstring());
auto validator = db::schema_tables::parse_type(row.get_as<sstring>("validator"));
if (is_empty_compact_value(row)) {
continue;
}
if (filter_sparse) {
if (kind_str == "compact_value") {
continue;
}
if (kind == column_kind::clustering_key) {
if (cf == cf_type::super && component_index != 0) {
continue;
}
if (cf != cf_type::super && !is_compound) {
continue;
}
}
}
std::optional<index_metadata_kind> index_kind;
sstring index_name;
index_options_map options;
if (row.has("index_type")) {
index_kind = schema_tables::deserialize_index_kind(row.get_as<sstring>("index_type"));
}
if (row.has("index_name")) {
index_name = row.get_as<sstring>("index_name");
}
if (row.has("index_options")) {
sstring index_options_str = row.get_as<sstring>("index_options");
options = rjson::parse_to_map<index_options_map>(std::string_view(index_options_str));
sstring type;
auto i = options.find("index_keys");
if (i != options.end()) {
options.erase(i);
type = "KEYS";
}
i = options.find("index_keys_and_values");
if (i != options.end()) {
options.erase(i);
type = "KEYS_AND_VALUES";
}
if (type.empty()) {
if (validator->is_collection() && validator->is_multi_cell()) {
type = "FULL";
} else {
type = "VALUES";
}
}
auto column = cql3::util::maybe_quote(name);
options["target"] = validator->is_collection()
? type + "(" + column + ")"
: column;
}
if (index_kind) {
// Origin assumes index_name is always set, so let's do the same
builder.with_index(index_metadata(index_name, options, *index_kind, index_metadata::is_local_index::no));
}
data_type column_name_type = [&] {
if (is_static_compact && kind == column_kind::regular_column) {
return db::schema_tables::parse_type(comparator);
}
return utf8_type;
}();
auto column_name = [&] {
try {
return column_name_type->from_string(name);
} catch (marshal_exception&) {
// #2597: Scylla < 2.0 writes names in serialized form, try to recover
column_name_type->validate(to_bytes_view(name));
return to_bytes(name);
}
}();
builder.with_column_ordered(column_definition(std::move(column_name), std::move(validator), kind, component_index));
}
if (is_static_compact) {
builder.set_regular_column_name_type(db::schema_tables::parse_type(comparator));
}
if (td.has("gc_grace_seconds")) {
builder.set_gc_grace_seconds(td.get_as<int32_t>("gc_grace_seconds"));
}
if (td.has("min_compaction_threshold")) {
builder.set_min_compaction_threshold(td.get_as<int32_t>("min_compaction_threshold"));
}
if (td.has("max_compaction_threshold")) {
builder.set_max_compaction_threshold(td.get_as<int32_t>("max_compaction_threshold"));
}
if (td.has("comment")) {
builder.set_comment(td.get_as<sstring>("comment"));
}
if (td.has("memtable_flush_period_in_ms")) {
builder.set_memtable_flush_period(td.get_as<int32_t>("memtable_flush_period_in_ms"));
}
if (td.has("caching")) {
builder.set_caching_options(caching_options::from_sstring(td.get_as<sstring>("caching")));
}
if (td.has("default_time_to_live")) {
builder.set_default_time_to_live(gc_clock::duration(td.get_as<int32_t>("default_time_to_live")));
}
if (td.has("speculative_retry")) {
builder.set_speculative_retry(td.get_as<sstring>("speculative_retry"));
}
if (td.has("compaction_strategy_class")) {
auto strategy = td.get_as<sstring>("compaction_strategy_class");
try {
builder.set_compaction_strategy(compaction::compaction_strategy::type(strategy));
} catch (const exceptions::configuration_exception& e) {
// If compaction strategy class isn't supported, fallback to incremental.
mlogger.warn("Falling back to incremental compaction strategy after the problem: {}", e.what());
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
}
}
if (td.has("compaction_strategy_options")) {
sstring strategy_options_str = td.get_as<sstring>("compaction_strategy_options");
builder.set_compaction_strategy_options(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options_str)));
}
auto comp_param = td.get_as<sstring>("compression_parameters");
compression_parameters cp(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(comp_param)));
builder.set_compressor_params(cp);
if (td.has("min_index_interval")) {
builder.set_min_index_interval(td.get_as<int32_t>("min_index_interval"));
} else if (td.has("index_interval")) { // compatibility
builder.set_min_index_interval(td.get_as<int32_t>("index_interval"));
}
if (td.has("max_index_interval")) {
builder.set_max_index_interval(td.get_as<int32_t>("max_index_interval"));
}
if (td.has("bloom_filter_fp_chance")) {
builder.set_bloom_filter_fp_chance(td.get_as<double>("bloom_filter_fp_chance"));
} else {
builder.set_bloom_filter_fp_chance(builder.get_bloom_filter_fp_chance());
}
if (td.has("dropped_columns")) {
auto map = td.get_map<sstring, int64_t>("dropped_columns");
for (auto&& e : map) {
builder.without_column(e.first, api::timestamp_type(e.second));
};
}
// ignore version. we're transient
if (!triggers->empty()) {
throw unsupported_feature("triggers");
}
dst.tables.emplace_back(table{timestamp, builder.build() });
});
}
future<> read_tables(keyspace& dst) {
auto query = fmt_query("SELECT columnfamily_name, writeTime(type) AS timestamp FROM {}.{} WHERE keyspace_name = ?",
db::system_keyspace::legacy::COLUMNFAMILIES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
return parallel_for_each(*result, [this, &dst](row_type& row) {
return read_table(dst, row.get_as<sstring>("columnfamily_name"), row.get_as<time_point>("timestamp"));
}).finally([result] {});
});
}
future<time_point> read_type_timestamp(keyspace& dst, sstring type_name) {
// TODO: Unfortunately there is not a single REGULAR column in system.schema_usertypes, so annoyingly we cannot
// use the writeTime() CQL function, and must resort to a lower level.
// Origin digs up the actual cells of target partition and gets timestamp from there.
// We should do the same, but g-dam that's messy. Lets give back dung value for now.
return make_ready_future<time_point>(dst.timestamp);
}
future<> read_types(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::USERTYPES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
return parallel_for_each(*result, [this, &dst](row_type& row) {
auto name = row.get_blob_unfragmented("type_name");
auto columns = row.get_list<bytes>("field_names");
auto types = row.get_list<sstring>("field_types");
std::vector<data_type> field_types;
for (auto&& value : types) {
field_types.emplace_back(db::schema_tables::parse_type(value));
}
auto ut = user_type_impl::get_instance(dst.name, name, columns, field_types, false);
return read_type_timestamp(dst, value_cast<sstring>(utf8_type->deserialize(name))).then([ut = std::move(ut), &dst](time_point timestamp) {
dst.types.emplace_back(type{timestamp, ut});
});
}).finally([result] {});
});
}
future<> read_functions(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::FUNCTIONS);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
if (!result->empty()) {
throw unsupported_feature("functions");
}
});
}
future<> read_aggregates(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::AGGREGATES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
if (!result->empty()) {
throw unsupported_feature("aggregates");
}
});
}
future<keyspace> read_keyspace(sstring ks_name, bool durable_writes, sstring strategy_class, sstring strategy_options, time_point timestamp) {
auto map = rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options));
map.emplace("class", std::move(strategy_class));
auto ks = ::make_lw_shared<keyspace>(keyspace{timestamp, std::move(ks_name), durable_writes, std::move(map) });
return read_tables(*ks).then([this, ks] {
//Collection<Type> types = readTypes(keyspaceName);
return read_types(*ks);
}).then([this, ks] {
return read_functions(*ks);
}).then([this, ks] {
return read_aggregates(*ks);
}).then([ks] {
return make_ready_future<keyspace>(std::move(*ks));
});
}
future<> read_all_keyspaces() {
static auto ks_filter = [](row_type& row) {
auto ks_name = row.get_as<sstring>("keyspace_name");
return ks_name != db::system_keyspace::NAME && ks_name != db::schema_tables::v3::NAME;
};
auto query = fmt_query("SELECT keyspace_name, durable_writes, strategy_options, strategy_class, writeTime(durable_writes) AS timestamp FROM {}.{}",
db::system_keyspace::legacy::KEYSPACES);
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([this](result_set_type result) {
auto i = boost::make_filter_iterator(ks_filter, result->begin(), result->end());
auto e = boost::make_filter_iterator(ks_filter, result->end(), result->end());
return parallel_for_each(i, e, [this](row_type& row) {
return read_keyspace(row.get_as<sstring>("keyspace_name")
, row.get_as<bool>("durable_writes")
, row.get_as<sstring>("strategy_class")
, row.get_as<sstring>("strategy_options")
, row.get_as<db_clock::time_point>("timestamp")
).then([this](keyspace ks) {
_keyspaces.emplace_back(std::move(ks));
});
}).finally([result] {});
});
}
future<> drop_legacy_tables() {
mlogger.info("Dropping legacy schema tables");
auto with_snapshot = !_keyspaces.empty();
for (const sstring& cfname : legacy_schema_tables) {
co_await replica::database::legacy_drop_table_on_all_shards(_db, _sys_ks, db::system_keyspace::NAME, cfname, with_snapshot);
}
}
future<> store_keyspaces_in_new_schema_tables() {
mlogger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
_keyspaces.size(), db::schema_tables::v3::NAME);
utils::chunked_vector<mutation> mutations;
for (auto& ks : _keyspaces) {
auto ksm = ::make_lw_shared<keyspace_metadata>(ks.name
, ks.replication_params["class"] // TODO, make ksm like c3?
, cql3::statements::property_definitions::to_extended_map(ks.replication_params)
, std::nullopt
, std::nullopt
, ks.durable_writes);
// we want separate time stamps for tables/types, so cannot bulk them into the ksm.
for (auto&& m : db::schema_tables::make_create_keyspace_mutations(schema_features::full(), ksm, ks.timestamp.time_since_epoch().count(), false)) {
mutations.emplace_back(std::move(m));
}
for (auto& t : ks.tables) {
db::schema_tables::add_table_or_view_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), true, mutations);
}
for (auto& t : ks.types) {
db::schema_tables::add_type_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), mutations);
}
}
return _qp.proxy().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
}
future<> flush_schemas() {
auto& db = _qp.db().real_database().container();
return replica::database::flush_tables_on_all_shards(db, db::schema_tables::all_table_infos(schema_features::full()));
}
future<> migrate() {
return read_all_keyspaces().then([this]() {
// write metadata to the new schema tables
return store_keyspaces_in_new_schema_tables()
.then(std::bind(&migrator::flush_schemas, this))
.then(std::bind(&migrator::drop_legacy_tables, this))
.then([] { mlogger.info("Completed migration of legacy schema tables"); });
});
}
sharded<service::storage_proxy>& _sp;
sharded<replica::database>& _db;
sharded<db::system_keyspace>& _sys_ks;
cql3::query_processor& _qp;
std::vector<keyspace> _keyspaces;
};
const std::unordered_set<sstring> migrator::legacy_schema_tables = {
db::system_keyspace::legacy::KEYSPACES,
db::system_keyspace::legacy::COLUMNFAMILIES,
db::system_keyspace::legacy::COLUMNS,
db::system_keyspace::legacy::TRIGGERS,
db::system_keyspace::legacy::USERTYPES,
db::system_keyspace::legacy::FUNCTIONS,
db::system_keyspace::legacy::AGGREGATES,
};
}
}
future<>
db::legacy_schema_migrator::migrate(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp) {
return do_with(migrator(sp, db, sys_ks, qp), std::bind(&migrator::migrate, std::placeholders::_1));
}

View File

@@ -1,37 +0,0 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include "seastarx.hh"
namespace replica {
class database;
}
namespace cql3 {
class query_processor;
}
namespace service {
class storage_proxy;
}
namespace db {
class system_keyspace;
namespace legacy_schema_migrator {
future<> migrate(sharded<service::storage_proxy>&, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor&);
}
}

View File

@@ -542,6 +542,7 @@ public:
// Returns the range tombstone for the key range adjacent to the cursor's position from the side of smaller keys.
// Excludes the range for the row itself. That information is returned by range_tombstone_for_row().
// It's possible that range_tombstone() is empty and range_tombstone_for_row() is not empty.
// Note that this is different from the meaning of rows_entry::range_tombstone(), which includes the row itself.
tombstone range_tombstone() const { return _range_tombstone; }
// Can be called when cursor is pointing at a row.

View File

@@ -1287,6 +1287,15 @@ row_cache::row_cache(schema_ptr s, snapshot_source src, cache_tracker& tracker,
, _partitions(dht::raw_token_less_comparator{})
, _underlying(src())
, _snapshot_source(std::move(src))
, _update_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.update {}.{}", _schema->ks_name(), _schema->cf_name());
}))
, _populate_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.populate {}.{}", _schema->ks_name(), _schema->cf_name());
}))
, _read_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.read {}.{}", _schema->ks_name(), _schema->cf_name());
}))
{
try {
with_allocator(_tracker.allocator(), [this, cont] {

View File

@@ -404,10 +404,7 @@ const std::unordered_set<table_id>& schema_tables_holding_schema_mutations() {
computed_columns(),
dropped_columns(),
indexes(),
scylla_tables(),
db::system_keyspace::legacy::column_families(),
db::system_keyspace::legacy::columns(),
db::system_keyspace::legacy::triggers()}) {
scylla_tables()}) {
SCYLLA_ASSERT(s->clustering_key_size() > 0);
auto&& first_column_name = s->clustering_column_at(0).name_as_text();
SCYLLA_ASSERT(first_column_name == "table_name"
@@ -2840,26 +2837,6 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
}
namespace legacy {
table_schema_version schema_mutations::digest() const {
md5_hasher h;
const db::schema_features no_features;
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, no_features);
db::schema_tables::feed_hash_for_schema_digest(h, _columns, no_features);
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
}
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s)
{
mutation cf_m = co_await read_schema_partition_for_table(proxy, s, keyspace_name, table_name);
mutation col_m = co_await read_schema_partition_for_table(proxy, db::system_keyspace::legacy::columns(), keyspace_name, table_name);
co_return schema_mutations{std::move(cf_m), std::move(col_m)};
}
} // namespace legacy
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);

View File

@@ -155,24 +155,6 @@ schema_ptr scylla_table_schema_history();
const std::unordered_set<table_id>& schema_tables_holding_schema_mutations();
}
namespace legacy {
class schema_mutations {
mutation _columnfamilies;
mutation _columns;
public:
schema_mutations(mutation columnfamilies, mutation columns)
: _columnfamilies(std::move(columnfamilies))
, _columns(std::move(columns))
{ }
table_schema_version digest() const;
};
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s);
}
struct qualified_name {
sstring keyspace_name;
sstring table_name;

View File

@@ -847,8 +847,6 @@ schema_ptr system_keyspace::corrupt_data() {
return corrupt_data;
}
static constexpr auto schema_gc_grace = std::chrono::duration_cast<std::chrono::seconds>(days(7)).count();
/*static*/ schema_ptr system_keyspace::scylla_local() {
static thread_local auto scylla_local = [] {
schema_builder builder(generate_legacy_id(NAME, SCYLLA_LOCAL), NAME, SCYLLA_LOCAL,
@@ -1360,289 +1358,6 @@ schema_ptr system_keyspace::role_permissions() {
return schema;
}
schema_ptr system_keyspace::legacy::hints() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS,
// partition key
{{"target_id", uuid_type}},
// clustering key
{{"hint_id", timeuuid_type}, {"message_version", int32_type}},
// regular columns
{{"mutation", bytes_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* hints awaiting delivery"
);
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
builder.set_compaction_strategy_options({{"enabled", "false"}});
builder.with(schema_builder::compact_storage::yes);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::batchlog() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, BATCHLOG), NAME, BATCHLOG,
// partition key
{{"id", uuid_type}},
// clustering key
{},
// regular columns
{{"data", bytes_type}, {"version", int32_type}, {"written_at", timestamp_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* batchlog entries"
);
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
builder.set_compaction_strategy_options({{"min_threshold", "2"}});
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::keyspaces() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, KEYSPACES), NAME, KEYSPACES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{},
// regular columns
{
{"durable_writes", boolean_type},
{"strategy_class", utf8_type},
{"strategy_options", utf8_type}
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* keyspace definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::yes);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::column_families() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, COLUMNFAMILIES), NAME, COLUMNFAMILIES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}},
// regular columns
{
{"bloom_filter_fp_chance", double_type},
{"caching", utf8_type},
{"cf_id", uuid_type},
{"comment", utf8_type},
{"compaction_strategy_class", utf8_type},
{"compaction_strategy_options", utf8_type},
{"comparator", utf8_type},
{"compression_parameters", utf8_type},
{"default_time_to_live", int32_type},
{"default_validator", utf8_type},
{"dropped_columns", map_type_impl::get_instance(utf8_type, long_type, true)},
{"gc_grace_seconds", int32_type},
{"is_dense", boolean_type},
{"key_validator", utf8_type},
{"max_compaction_threshold", int32_type},
{"max_index_interval", int32_type},
{"memtable_flush_period_in_ms", int32_type},
{"min_compaction_threshold", int32_type},
{"min_index_interval", int32_type},
{"speculative_retry", utf8_type},
{"subcomparator", utf8_type},
{"type", utf8_type},
// The following 4 columns are only present up until 2.1.8 tables
{"key_aliases", utf8_type},
{"value_alias", utf8_type},
{"column_aliases", utf8_type},
{"index_interval", int32_type},},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* table definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::columns() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, COLUMNS), NAME, COLUMNS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}, {"column_name", utf8_type}},
// regular columns
{
{"component_index", int32_type},
{"index_name", utf8_type},
{"index_options", utf8_type},
{"index_type", utf8_type},
{"type", utf8_type},
{"validator", utf8_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"column definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::triggers() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, TRIGGERS), NAME, TRIGGERS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}, {"trigger_name", utf8_type}},
// regular columns
{
{"trigger_options", map_type_impl::get_instance(utf8_type, utf8_type, true)},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"trigger definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::usertypes() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, USERTYPES), NAME, USERTYPES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"type_name", utf8_type}},
// regular columns
{
{"field_names", list_type_impl::get_instance(utf8_type, true)},
{"field_types", list_type_impl::get_instance(utf8_type, true)},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"user defined type definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::functions() {
/**
* Note: we have our own "legacy" version of this table (in schema_tables),
* but it is (afaik) not used, and differs slightly from the origin one.
* This is based on the origin schema, since we're more likely to encounter
* installations of that to migrate, rather than our own (if we dont use the table).
*/
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, FUNCTIONS), NAME, FUNCTIONS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"function_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
// regular columns
{
{"argument_names", list_type_impl::get_instance(utf8_type, true)},
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
{"body", utf8_type},
{"language", utf8_type},
{"return_type", utf8_type},
{"called_on_null_input", boolean_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* user defined type definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::aggregates() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, AGGREGATES), NAME, AGGREGATES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"aggregate_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
// regular columns
{
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
{"final_func", utf8_type},
{"initcond", bytes_type},
{"return_type", utf8_type},
{"state_func", utf8_type},
{"state_type", utf8_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* user defined aggregate definition"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::dicts() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, DICTS);
@@ -2615,13 +2330,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
if (cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
r.insert(r.end(), {sstables_registry()});
}
// legacy schema
r.insert(r.end(), {
// TODO: once we migrate hints/batchlog and add converter
// legacy::hints(), legacy::batchlog(),
legacy::keyspaces(), legacy::column_families(),
legacy::columns(), legacy::triggers(), legacy::usertypes(),
legacy::functions(), legacy::aggregates(), });
return r;
}

View File

@@ -241,28 +241,6 @@ public:
static schema_ptr cdc_local();
};
struct legacy {
static constexpr auto HINTS = "hints";
static constexpr auto BATCHLOG = "batchlog";
static constexpr auto KEYSPACES = "schema_keyspaces";
static constexpr auto COLUMNFAMILIES = "schema_columnfamilies";
static constexpr auto COLUMNS = "schema_columns";
static constexpr auto TRIGGERS = "schema_triggers";
static constexpr auto USERTYPES = "schema_usertypes";
static constexpr auto FUNCTIONS = "schema_functions";
static constexpr auto AGGREGATES = "schema_aggregates";
static schema_ptr keyspaces();
static schema_ptr column_families();
static schema_ptr columns();
static schema_ptr triggers();
static schema_ptr usertypes();
static schema_ptr functions();
static schema_ptr aggregates();
static schema_ptr hints();
static schema_ptr batchlog();
};
// Partition estimates for a given range of tokens.
struct range_estimates {
schema_ptr schema;

View File

@@ -9,6 +9,8 @@
#include "query/query-result-reader.hh"
#include "replica/database_fwd.hh"
#include "db/timeout_clock.hh"
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
namespace service {
class storage_proxy;
@@ -25,8 +27,14 @@ class delete_ghost_rows_visitor {
replica::table& _view_table;
schema_ptr _base_schema;
std::optional<partition_key> _view_pk;
db::timeout_semaphore _concurrency_semaphore;
seastar::gate _gate;
std::exception_ptr& _ex;
public:
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration);
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex);
delete_ghost_rows_visitor(delete_ghost_rows_visitor&&) = default;
~delete_ghost_rows_visitor() noexcept;
void add_value(const column_definition& def, query::result_row_view::iterator_type& i) {
}
@@ -45,6 +53,9 @@ public:
uint32_t accept_partition_end(const query::result_row_view& static_row) {
return 0;
}
private:
future<> do_accept_new_row(partition_key pk, clustering_key ck);
};
} //namespace db::view

View File

@@ -3597,7 +3597,7 @@ view_updating_consumer::view_updating_consumer(view_update_generator& gen, schem
})
{ }
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration)
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex)
: _proxy(proxy)
, _state(state)
, _timeout_duration(timeout_duration)
@@ -3605,8 +3605,20 @@ delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& pro
, _view_table(_proxy.get_db().local().find_column_family(view))
, _base_schema(_proxy.get_db().local().find_schema(_view->view_info()->base_id()))
, _view_pk()
, _concurrency_semaphore(concurrency)
, _ex(ex)
{}
delete_ghost_rows_visitor::~delete_ghost_rows_visitor() noexcept {
try {
_gate.close().get();
} catch (...) {
// Closing the gate should never throw, but if it does anyway, capture the exception.
_ex = std::current_exception();
}
}
void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, uint32_t row_count) {
SCYLLA_ASSERT(thread::running_in_thread());
_view_pk = key;
@@ -3614,7 +3626,18 @@ void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, u
// Assumes running in seastar::thread
void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const query::result_row_view& static_row, const query::result_row_view& row) {
auto view_exploded_pk = _view_pk->explode();
auto units = get_units(_concurrency_semaphore, 1).get();
(void)seastar::try_with_gate(_gate, [this, pk = _view_pk.value(), units = std::move(units), ck] () mutable {
return do_accept_new_row(std::move(pk), std::move(ck)).then_wrapped([this, units = std::move(units)] (future<>&& f) mutable {
if (f.failed()) {
_ex = f.get_exception();
}
});
});
}
future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clustering_key ck) {
auto view_exploded_pk = pk.explode();
auto view_exploded_ck = ck.explode();
std::vector<bytes> base_exploded_pk(_base_schema->partition_key_size());
std::vector<bytes> base_exploded_ck(_base_schema->clustering_key_size());
@@ -3649,17 +3672,17 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
_proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit()));
auto timeout = db::timeout_clock::now() + _timeout_duration;
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get();
auto base_qr = co_await _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts);
query::result& result = *base_qr.query_result;
auto delete_ghost_row = [&]() {
mutation m(_view, *_view_pk);
auto delete_ghost_row = [&]() -> future<> {
mutation m(_view, pk);
auto& row = m.partition().clustered_row(*_view, ck);
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
timeout = db::timeout_clock::now() + _timeout_duration;
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get();
return _proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no);
};
if (result.row_count().value_or(0) == 0) {
delete_ghost_row();
co_await delete_ghost_row();
} else if (!view_key_cols_not_in_base_key.empty()) {
if (result.row_count().value_or(0) != 1) {
on_internal_error(vlogger, format("Got multiple base rows corresponding to a single view row when pruning {}.{}", _view->ks_name(), _view->cf_name()));
@@ -3669,7 +3692,7 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
for (const auto& [col_def, col_val] : view_key_cols_not_in_base_key) {
const data_value* base_val = base_row.get_data_value(col_def->name_as_text());
if (!base_val || base_val->is_null() || col_val != base_val->serialize_nonnull()) {
delete_ghost_row();
co_await delete_ghost_row();
break;
}
}

View File

@@ -2,7 +2,6 @@ etc/default/scylla-server
etc/default/scylla-housekeeping
etc/scylla.d/*.conf
etc/bash_completion.d/nodetool-completion
opt/scylladb/share/p11-kit/modules/*
opt/scylladb/share/doc/scylla/*
opt/scylladb/share/doc/scylla/licenses/
usr/lib/systemd/system/*.timer

View File

@@ -122,7 +122,6 @@ ln -sfT /etc/scylla /var/lib/scylla/conf
%config(noreplace) %{_sysconfdir}/sysconfig/scylla-housekeeping
%attr(0755,root,root) %dir %{_sysconfdir}/scylla.d
%config(noreplace) %{_sysconfdir}/scylla.d/*.conf
/opt/scylladb/share/p11-kit/modules/*
/opt/scylladb/share/doc/scylla/*
%{_unitdir}/scylla-fstrim.service
%{_unitdir}/scylla-housekeeping-daily.service

View File

@@ -1,6 +1,18 @@
### a dictionary of redirections
#old path: new path
# Move the diver information to another project
/stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html
/stable/using-scylla/drivers/dynamo-drivers/index.html: https://docs.scylladb.com/stable/drivers/dynamo-drivers.html
/stable/using-scylla/drivers/cql-drivers/index.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-python-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-java-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-go-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-gocqlx-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-cpp-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-rust-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
# Redirect 2025.1 upgrade guides that are not on master but were indexed by Google (404 reported)
/master/upgrade/upgrade-guides/upgrade-guide-from-2024.x-to-2025.1/upgrade-guide-from-2024.x-to-2025.1.html: https://docs.scylladb.com/manual/stable/upgrade/index.html

View File

@@ -106,6 +106,15 @@ which is recommended in order to make the operation less heavyweight
and allow for running multiple parallel pruning statements for non-overlapping
token ranges.
By default, the PRUNE MATERIALIZED VIEW statement is relatively slow, only
performing one base read or write at a time. This can be changed with the
USING CONCURRENCY clause. If the clause is used, the concurrency of reads
and writes from the base table will be allowed to increase up to the specified
value. For example, to run the PRUNE with 100 parallel reads/writes, you can use:
```cql
PRUNE MATERIALIZED VIEW my_view WHERE v = 19 USING CONCURRENCY 100;
```
## Synchronous materialized views
Usually, when a table with materialized views is updated, the update to the

View File

@@ -45,6 +45,22 @@ immediately after it's finished.
A flag which determines if a task can be aborted through API.
# Task timing fields
Tasks have three timing fields that track different stages of their lifecycle:
- `creation_time` - When the task was created/queued. This is extracted from the task's
UUID (which is a timeuuid) and represents the moment the task request was submitted.
- `start_time` - When the task actually began executing. For tasks that are queued, this
will be unspecified (equal to epoch) until execution starts. For node operations
like decommission, this is set when the request is picked up for execution by the
topology coordinator.
- `end_time` - When the task completed (successfully or with an error). This is
unspecified (equal to epoch) until the task finishes.
The difference between `creation_time` and `start_time` represents the time a task
spent waiting in the queue before execution began.
# Type vs scope vs kind
`type` of a task describes what operation is covered by a task,

View File

@@ -37,7 +37,7 @@ Getting Started
:id: "getting-started"
:class: my-panel
* :doc:`ScyllaDB Drivers</using-scylla/drivers/index>`
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
* `Get Started Lesson on ScyllaDB University <https://university.scylladb.com/courses/scylla-essentials-overview/lessons/quick-wins-install-and-run-scylla/>`_
* :doc:`CQL Reference </cql/index>`
* :doc:`cqlsh - the CQL shell </cql/cqlsh/>`

View File

@@ -35,7 +35,7 @@ Documentation Highlights
* :doc:`Cluster Management Procedures </operating-scylla/procedures/cluster-management/index>`
* :doc:`Upgrade ScyllaDB </upgrade/index>`
* :doc:`CQL Reference </cql/index>`
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
* :doc:`Features </features/index>`
ScyllaDB Support

View File

@@ -172,7 +172,7 @@ For example:
* `ScyllaDB Java Driver <https://github.com/scylladb/java-driver/tree/3.7.1-scylla/manual/compression>`_
* `Go Driver <https://godoc.org/github.com/gocql/gocql#Compressor>`_
Refer to the :doc:`Drivers Page </using-scylla/drivers/index>` for more drivers.
Refer to `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ for more drivers.
.. _internode-compression:

View File

@@ -110,7 +110,6 @@ To display the log classes (output changes with each version so your display may
keys
keyspace_utils
large_data
legacy_schema_migrator
lister
load_balancer
load_broadcaster

View File

@@ -42,21 +42,21 @@ For single list:
.. code-block:: shell
task_id type kind scope state sequence_number keyspace table entity shard start_time end_time
5116ddb6-85b5-4c3e-94fb-72128f15d7b4 repair node keyspace done 3 abc 0 2025-01-16T16:12:11Z 2025-01-16T16:12:13Z
task_id type kind scope state sequence_number keyspace table entity shard creation_time start_time end_time
5116ddb6-85b5-4c3e-94fb-72128f15d7b4 repair node keyspace done 3 abc 0 2025-01-16T16:12:08Z 2025-01-16T16:12:11Z 2025-01-16T16:12:13Z
With repetition:
.. code-block:: shell
task_id type kind scope state sequence_number keyspace table entity shard start_time end_time
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:57Z
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
task_id type kind scope state sequence_number keyspace table entity shard creation_time start_time end_time
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:54Z 2025-01-16T16:12:57Z
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:42Z 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
task_id type kind scope state sequence_number keyspace table entity shard start_time end_time
1e535f9b-97fa-4788-a956-8f3216a6ea8d repair node keyspace created 6 abc 0
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:57Z
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
task_id type kind scope state sequence_number keyspace table entity shard creation_time start_time end_time
1e535f9b-97fa-4788-a956-8f3216a6ea8d repair node keyspace created 6 abc 0 2025-01-16T16:13:02Z
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:54Z 2025-01-16T16:12:57Z
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:42Z 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
See also
--------

View File

@@ -25,6 +25,7 @@ Example output
scope: keyspace
state: running
is_abortable: true
creation_time: 2024-07-29T15:48:50Z
start_time: 2024-07-29T15:48:55Z
end_time:
error:

View File

@@ -26,22 +26,22 @@ For single task:
.. code-block:: shell
id type kind scope state is_abortable start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
be5559ea-bc5a-428c-b8ce-d14eac7a1765 repair node keyspace done true 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z none 1 0 abc ranges 4 4 [{task_id: 542e38cb-9ad4-40aa-9010-de2630004e55, node: 127.0.0.1 }, {task_id: 8974ebcc-1e87-4040-88fe-f2438261f7fb, node: 127.0.0.1 }]
542e38cb-9ad4-40aa-9010-de2630004e55 repair node shard done false 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 0 abc ranges 2 2 []
8974ebcc-1e87-4040-88fe-f2438261f7fb repair node shard done false 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 1 abc ranges 2 2 []
id type kind scope state is_abortable creation_time start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
be5559ea-bc5a-428c-b8ce-d14eac7a1765 repair node keyspace done true 2024-07-29T16:06:43Z 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z none 1 0 abc ranges 4 4 [{task_id: 542e38cb-9ad4-40aa-9010-de2630004e55, node: 127.0.0.1 }, {task_id: 8974ebcc-1e87-4040-88fe-f2438261f7fb, node: 127.0.0.1 }]
542e38cb-9ad4-40aa-9010-de2630004e55 repair node shard done false 2024-07-29T16:06:43Z 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 0 abc ranges 2 2 []
8974ebcc-1e87-4040-88fe-f2438261f7fb repair node shard done false 2024-07-29T16:06:43Z 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 1 abc ranges 2 2 []
For all tasks:
.. code-block:: shell
id type kind scope state is_abortable start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc repair node keyspace done true 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z none 1 0 abc ranges 4 4 [{task_id: e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee, node: 127.0.0.1 }, {task_id: 49eb5797-b67e-46b0-9365-4460f7cf988a, node: 127.0.0.1 }]
e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee repair node shard done false 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 0 abc ranges 2 2 []
49eb5797-b67e-46b0-9365-4460f7cf988a repair node shard done false 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 1 abc ranges 2 2 []
82d7b2a4-146e-4a72-ba93-c66d5b4e9867 offstrategy compaction node keyspace done true 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z none 954 0 abc 1 1 [{task_id: 9818277b-238d-4298-a56b-c0d2153bf140, node: 127.0.0.1 }, {task_id: c1eb0701-ad7a-45ff-956f-7b8d671fc5db, node: 127.0.0.1 }
9818277b-238d-4298-a56b-c0d2153bf140 offstrategy compaction node shard done false 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 0 abc 1 1 []
c1eb0701-ad7a-45ff-956f-7b8d671fc5db offstrategy compaction node shard done false 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 1 abc 1 1 []
id type kind scope state is_abortable creation_time start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc repair node keyspace done true 2024-07-29T16:34:43Z 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z none 1 0 abc ranges 4 4 [{task_id: e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee, node: 127.0.0.1 }, {task_id: 49eb5797-b67e-46b0-9365-4460f7cf988a, node: 127.0.0.1 }]
e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee repair node shard done false 2024-07-29T16:34:43Z 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 0 abc ranges 2 2 []
49eb5797-b67e-46b0-9365-4460f7cf988a repair node shard done false 2024-07-29T16:34:43Z 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 1 abc ranges 2 2 []
82d7b2a4-146e-4a72-ba93-c66d5b4e9867 offstrategy compaction node keyspace done true 2024-07-29T16:34:13Z 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z none 954 0 abc 1 1 [{task_id: 9818277b-238d-4298-a56b-c0d2153bf140, node: 127.0.0.1 }, {task_id: c1eb0701-ad7a-45ff-956f-7b8d671fc5db, node: 127.0.0.1 }
9818277b-238d-4298-a56b-c0d2153bf140 offstrategy compaction node shard done false 2024-07-29T16:34:13Z 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 0 abc 1 1 []
c1eb0701-ad7a-45ff-956f-7b8d671fc5db offstrategy compaction node shard done false 2024-07-29T16:34:13Z 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 1 abc 1 1 []
See also
--------

View File

@@ -206,7 +206,7 @@ This is 19% of the latency compared to no batching.
Driver Guidelines
-----------------
Use the :doc:`ScyllaDB drivers </using-scylla/drivers/index>` that are available for Java, Python, Go, and C/C++.
Use the `ScyllaDB drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ that are available for Java, Python, Go, and C/C++.
They provide much better performance than third-party drivers because they are shard aware &emdash; they can route requests to the right CPU core (shard).
When the driver starts, it gets the topology of the cluster and therefore it knows exactly which CPU core should get a request.
Our latest shard-aware drivers also improve the efficiency of our Change Data Capture (CDC) feature.

View File

@@ -121,7 +121,7 @@ Driver Compression
This refers to compressing traffic between the client and ScyllaDB.
Verify your client driver is using compressed traffic when connected to ScyllaDB.
As compression is driver settings dependent, please check your client driver manual or :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`.
As compression is driver settings dependent, please check your client driver manual. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
Connectivity
@@ -130,7 +130,7 @@ Connectivity
Drivers Settings
================
* Use shard aware drivers wherever possible. :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` (not third-party drivers) are shard aware.
* Use shard aware drivers wherever possible. `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ (not third-party drivers) are shard aware.
* Configure connection pool - open more connections (>3 per shard) and/Or more clients. See `this blog <https://www.scylladb.com/2019/11/20/maximizing-performance-via-concurrency-while-minimizing-timeouts-in-distributed-databases/>`_.
Management

View File

@@ -25,8 +25,8 @@ Actions
If your cluster is having timeouts during overload, check first if you are not making the overload situation worse through retries, and pay attention to the following:
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults.
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults.
* Make sure the server neither runs speculative retry nor runs it based on percentiles (as those can fluctuate aggressively). Server-side speculative retries are a per-table setting that can be changed with the ALTER TABLE command. See the :ref:`documentation <speculative-retry-options>` for details.

View File

@@ -9,9 +9,19 @@ To ensure a successful upgrade, follow
the :doc:`documented upgrade procedures <upgrade-guides/index>` tested by
ScyllaDB. This means that:
* You should perform the upgrades consecutively - to each successive X.Y
version, **without skipping any major or minor version**, unless there is
a documented upgrade procedure to bypass a version.
* You should follow the upgrade policy:
* Starting with version **2025.4**, upgrades can skip minor versions as long
as they remain within the same major version (for example, upgrading directly
from 2025.1 → 2025.4 is supported).
* For versions **prior to 2025.4**, upgrades must be performed consecutively—
each successive X.Y version must be installed in order, **without skipping
any major or minor version** (for example, upgrading directly from 2025.1 → 2025.3
is not supported).
* You cannot skip major versions. Upgrades must move from one major version to
the next using the documented major-version upgrade path.
* You should upgrade to a supported version of ScyllaDB.
See `ScyllaDB Version Support <https://docs.scylladb.com/stable/versioning/version-support.html>`_.
* Before you upgrade to the next version, the whole cluster (each node) must
be upgraded to the previous version.
* You cannot perform an upgrade by replacing the nodes in the cluster with new

Binary file not shown.

Before

Width:  |  Height:  |  Size: 28 KiB

View File

@@ -1,141 +0,0 @@
=====================
ScyllaDB CQL Drivers
=====================
.. toctree::
:titlesonly:
:hidden:
scylla-python-driver
scylla-java-driver
scylla-go-driver
scylla-gocqlx-driver
scylla-cpp-driver
scylla-rust-driver
ScyllaDB Drivers
-----------------
The following ScyllaDB drivers are available:
* :doc:`Python Driver</using-scylla/drivers/cql-drivers/scylla-python-driver>`
* :doc:`Java Driver </using-scylla/drivers/cql-drivers/scylla-java-driver>`
* :doc:`Go Driver </using-scylla/drivers/cql-drivers/scylla-go-driver>`
* :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
* :doc:`C++ Driver </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
* `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
* :doc:`Rust Driver </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
* `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
We recommend using ScyllaDB drivers. All ScyllaDB drivers are shard-aware and provide additional
benefits over third-party drivers.
ScyllaDB supports the CQL binary protocol version 3, so any Apache Cassandra/CQL driver that implements
the same version works with ScyllaDB.
CDC Integration with ScyllaDB Drivers
-------------------------------------------
The following table specifies which ScyllaDB drivers include a library for
:doc:`CDC </features/cdc/cdc-intro>`.
.. list-table::
:widths: 40 60
:header-rows: 1
* - ScyllaDB Driver
- CDC Connector
* - :doc:`Python </using-scylla/drivers/cql-drivers/scylla-python-driver>`
- |x|
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
- |v|
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
- |v|
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
- |x|
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
- |x|
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
- |x|
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
- |v|
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
- |x|
Support for Tablets
-------------------------
The following table specifies which ScyllaDB drivers support
:doc:`tablets </architecture/tablets>` and since which version.
.. list-table::
:widths: 30 35 35
:header-rows: 1
* - ScyllaDB Driver
- Support for Tablets
- Since Version
* - :doc:`Python</using-scylla/drivers/cql-drivers/scylla-python-driver>`
- |v|
- 3.26.5
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
- |v|
- 4.18.0 (Java Driver 4.x)
3.11.5.2 (Java Driver 3.x)
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
- |v|
- 1.13.0
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
- |x|
- N/A
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
- |x|
- N/A
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
- |v|
- All versions
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
- |v|
- 0.13.0
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
- |v|
- All versions
Driver Support Policy
-------------------------------
We support the **two most recent minor releases** of our drivers.
* We test and validate the latest two minor versions.
* We typically patch only the latest minor release.
We recommend staying up to date with the latest supported versions to receive
updates and fixes.
At a minimum, upgrade your driver when upgrading to a new ScyllaDB version
to ensure compatibility between the driver and the database.
Third-party Drivers
----------------------
You can find the third-party driver documentation on the GitHub pages for each driver:
* `DataStax Java Driver <https://github.com/datastax/java-driver/>`_
* `DataStax Python Driver <https://github.com/datastax/python-driver/>`_
* `DataStax C# Driver <https://github.com/datastax/csharp-driver/>`_
* `DataStax Ruby Driver <https://github.com/datastax/ruby-driver/>`_
* `DataStax Node.js Driver <https://github.com/datastax/nodejs-driver/>`_
* `DataStax C++ Driver <https://github.com/datastax/cpp-driver/>`_
* `DataStax PHP Driver (Supported versions: 7.1) <https://github.com/datastax/php-driver>`_
* `He4rt PHP Driver (Supported versions: 8.1 and 8.2) <https://github.com/he4rt/scylladb-php-driver/>`_
* `Scala Phantom Project <https://github.com/outworkers/phantom>`_
* `Xandra Elixir Driver <https://github.com/lexhide/xandra>`_
* `Exandra Elixir Driver <https://github.com/vinniefranco/exandra>`_
Learn about ScyllaDB Drivers on ScyllaDB University
----------------------------------------------------
The free `Using ScyllaDB Drivers course <https://university.scylladb.com/courses/using-scylla-drivers/>`_
on ScyllaDB University covers the use of drivers in multiple languages to interact with a ScyllaDB
cluster. The languages covered include Java, CPP, Rust, Golang, Python, Node.JS, Scala, and others.

View File

@@ -1,16 +0,0 @@
===================
ScyllaDB C++ Driver
===================
The ScyllaDB C++ driver is a modern, feature-rich and **shard-aware** C/C++ client library for ScyllaDB using exclusively Cassandras binary protocol and Cassandra Query Language v3.
This driver is forked from Datastax cpp-driver.
Read the `documentation <https://cpp-driver.docs.scylladb.com>`_ to get started or visit the Github project `ScyllaDB C++ driver <https://github.com/scylladb/cpp-driver>`_.
More Information
----------------
* `C++ Driver Documentation <https://cpp-driver.docs.scylladb.com>`_
* `C/C++ Driver course at ScyllaDB University <https://university.scylladb.com/courses/using-scylla-drivers/lessons/cpp-driver-part-1/>`_
* `Blog: A Shard-Aware ScyllaDB C/C++ Driver <https://www.scylladb.com/2021/03/18/a-shard-aware-scylla-c-c-driver/>`_

View File

@@ -1,28 +0,0 @@
==================
ScyllaDB Go Driver
==================
The `ScyllaDB Go driver <https://github.com/scylladb/gocql>`_ is shard aware and contains extensions for a tokenAwareHostPolicy supported by ScyllaDB 2.3 and onwards.
It is is a fork of the `GoCQL Driver <https://github.com/gocql/gocql>`_ but has been enhanced with capabilities that take advantage of ScyllaDB's unique architecture.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
The protocol extension spec is `available here <https://github.com/scylladb/scylla/blob/master/docs/dev/protocol-extensions.md>`_.
The ScyllaDB Go Driver is a drop-in replacement for gocql.
As such, no code changes are needed to use this driver.
All you need to do is rebuild using the ``replace`` directive in your ``mod`` file.
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/gocql>`_.
Using CDC with Go
-----------------
When writing applications, you can now use our `Go Library <https://github.com/scylladb/scylla-cdc-go>`_ to simplify writing applications that read from ScyllaDB CDC.
More information
----------------
* `ScyllaDB Gocql Driver project page on GitHub <https://github.com/scylladb/gocql>`_ - contains the source code as well as a readme and documentation files.
* `ScyllaDB University: Golang and ScyllaDB <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-1/>`_
A three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Gocql driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Go application.

View File

@@ -1,16 +0,0 @@
=========================
ScyllaDB Gocql Extension
=========================
The ScyllaDB Gocqlx is an extension to gocql that provides usability features.
With gocqlx, you can bind the query parameters from maps and structs, use named query parameters (``:identifier``), and scan the query results into structs and slices.
The driver includes a fluent and flexible CQL query builder and a database migrations module.
More information
----------------
* `ScyllaDB Gocqlx Driver project page on GitHub <https://github.com/scylladb/gocqlx>`_ - contains the source code as well as a readme and documentation files.
* `ScyllaDB University: Golang and ScyllaDB Part 3 GoCQLX <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-3-gocqlx/>`_ - part three of the Golang three-part course which focuses on how to create a sample Go application that executes a few basic CQL statements with a ScyllaDB cluster using the GoCQLX package

View File

@@ -1,31 +0,0 @@
=====================
ScyllaDB Java Driver
=====================
ScyllaDB Java Driver is forked from `DataStax Java Driver <https://github.com/datastax/java-driver>`_ with enhanced capabilities, taking advantage of ScyllaDB's unique architecture.
The ScyllaDB Java driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
Use the ScyllaDB Java driver for better compatibility and support for ScyllaDB with Java-based applications.
Read the `documentation <https://java-driver.docs.scylladb.com/>`_ to get started or visit the `Github project <https://github.com/scylladb/java-driver>`_.
The driver architecture is based on layers. At the bottom lies the driver core.
This core handles everything related to the connections to a ScyllaDB cluster (for example, connection pool, discovering new nodes, etc.) and exposes a simple, relatively low-level API on top of which higher-level layers can be built.
The ScyllaDB Java Driver is a drop-in replacement for the DataStax Java Driver.
As such, no code changes are needed to use this driver.
Using CDC with Java
-------------------
When writing applications, you can now use our `Java Library <https://github.com/scylladb/scylla-cdc-java>`_ to simplify writing applications that read from ScyllaDB CDC.
More information
----------------
* `ScyllaDB Java Driver Docs <https://java-driver.docs.scylladb.com/>`_
* `ScyllaDB Java Driver project page on GitHub <https://github.com/scylladb/java-driver/>`_ - Source Code
* `ScyllaDB University: Coding with Java <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-java-part-1/>`_ - a three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Java driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Java application.

View File

@@ -1,20 +0,0 @@
======================
ScyllaDB Python Driver
======================
The ScyllaDB Python driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
Read the `documentation <https://python-driver.docs.scylladb.com/>`_ to get started or visit the Github project `ScyllaDB Python driver <https://github.com/scylladb/python-driver/>`_.
As the ScyllaDB Python Driver is a drop-in replacement for DataStax Python Driver, no code changes are needed to use the driver.
Use the ScyllaDB Python driver for better compatibility and support for ScyllaDB with Python-based applications.
More information
----------------
* `ScyllaDB Python Driver Documentation <https://python-driver.docs.scylladb.com/>`_
* `ScyllaDB Python Driver on GitHub <https://github.com/scylladb/python-driver/>`_
* `ScyllaDB University: Coding with Python <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-python/>`_

View File

@@ -1,24 +0,0 @@
=====================
ScyllaDB Rust Driver
=====================
The ScyllaDB Rust driver is a client-side, shard-aware driver written in pure Rust with a fully async API using Tokio.
Optimized for ScyllaDB, the driver is also compatible with Apache Cassandra®.
.. image:: ./images/monster-rust.png
:width: 150pt
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/scylla-rust-driver>`_.
Read the `Documentation <https://rust-driver.docs.scylladb.com>`_.
Using CDC with Rust
----------------------
When writing applications, you can use ScyllaDB's `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_
to simplify writing applications that read from ScyllaDB's CDC.
Use `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_ to read
:doc:`ScyllaDB's CDC </features/cdc/index>` update streams.

View File

@@ -1,9 +0,0 @@
========================
AWS DynamoDB Drivers
========================
ScyllaDB AWS DynamoDB Compatible API can be used with any AWS DynamoDB Driver.
For a list of AWS AWS DynamoDB drivers see `here <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStarted.html>`_

View File

@@ -1,21 +0,0 @@
================
ScyllaDB Drivers
================
.. toctree::
:titlesonly:
:hidden:
ScyllaDB CQL Drivers <cql-drivers/index>
ScyllaDB DynamoDB Drivers <dynamo-drivers/index>
You can use ScyllaDB with:
* :doc:`Apache Cassandra CQL Compatible Drivers <cql-drivers/index>`
* :doc:`Amazon DynamoDB Compatible API Drivers <dynamo-drivers/index>`
Additional drivers coming soon!
If you are looking for a ScyllaDB Integration Solution or a Connector, refer to :doc:`ScyllaDB Integrations </using-scylla/integrations/index>`.

View File

@@ -9,7 +9,7 @@ ScyllaDB for Developers
Tutorials and Example Projects <https://docs.scylladb.com/stable/get-started/develop-with-scylladb/tutorials-example-projects.html>
Learn to Use ScyllaDB <https://docs.scylladb.com/stable/get-started/learn-resources/index.html>
ScyllaDB Alternator <alternator/index>
ScyllaDB Drivers <drivers/index>
ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>
.. panel-box::
@@ -26,7 +26,7 @@ ScyllaDB for Developers
:id: "getting-started"
:class: my-panel
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` - ScyllaDB and third-party drivers for CQL and DynamoDB
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ - ScyllaDB and third-party drivers for CQL and DynamoDB
* :doc:`ScyllaDB Alternator </using-scylla/alternator/index>` - The Open Source DynamoDB-compatible API
* :doc:`CQL Reference </cql/index>` - Reference for the Apache Cassandra Query Language (CQL) and its ScyllaDB extensions

View File

@@ -28,7 +28,7 @@ ScyllaDB Integrations and Connectors
:class: my-panel
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`).
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
Any application which uses a CQL driver will work with ScyllaDB.
The list below contains links to integration projects using ScyllaDB with third-party projects.

View File

@@ -2,7 +2,7 @@
Integrate ScyllaDB with Databricks
==================================
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
Resource list
-------------

View File

@@ -3,7 +3,7 @@ Integrate ScyllaDB with Elasticsearch
=====================================
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
The list below contains integration projects using ScyllaDB with Elasticsearch. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.

View File

@@ -13,11 +13,11 @@ The Jaeger Query service offers a web-based UI and API for users to explore, vis
Jaeger also supports integration with other observability tools like Prometheus and Grafana,
making it a popular choice for monitoring modern distributed applications.
Jaeger Server `can also be run <https://github.com/jaegertracing/jaeger/tree/main/plugin/storage/scylladb>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
Jaeger Server `can also be run <https://www.jaegertracing.io/docs/2.11/storage/cassandra/#compatible-backends>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
As a drop-in replacement for Cassandra, ScyllaDB implements the same protocol and provides a high-performance,
low-latency alternative. This compatibility allows Jaeger users to easily switch to ScyllaDB without making significant changes to their setup.
Using ScyllaDB as the storage backend for Jaeger Server can offer additional benefits,
such as improved performance, scalability, and resource efficiency.
This makes Jaeger even more effective for monitoring and troubleshooting distributed applications,
especially in high-traffic, demanding environments where a high-performance storage solution is critical.
especially in high-traffic, demanding environments where a high-performance storage solution is critical.

View File

@@ -3,7 +3,7 @@ Integrate ScyllaDB with Spark
=============================
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
The list below contains integration projects using ScyllaDB with Spark. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.

View File

@@ -15,6 +15,7 @@
#include "db/config.hh"
#include "utils/log.hh"
#include "utils/hash.hh"
#include "utils/http.hh"
#include "utils/rjson.hh"
#include "utils/base64.hh"
#include "utils/loading_cache.hh"
@@ -267,7 +268,6 @@ std::tuple<std::string, std::string> azure_host::impl::parse_key(std::string_vie
std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std::string_view vault) {
static const boost::regex vault_name_re(R"([a-zA-Z0-9-]+)");
static const boost::regex vault_endpoint_re(R"((https?)://([^/:]+)(?::(\d+))?)");
boost::smatch match;
std::string tmp{vault};
@@ -277,16 +277,12 @@ std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std
return {"https", fmt::format(AKV_HOST_TEMPLATE, vault), 443};
}
if (boost::regex_match(tmp, match, vault_endpoint_re)) {
std::string scheme = match[1];
std::string host = match[2];
std::string port_str = match[3];
unsigned port = (port_str.empty()) ? (scheme == "https" ? 443 : 80) : std::stoi(port_str);
return {scheme, host, port};
try {
auto info = utils::http::parse_simple_url(tmp);
return {info.scheme, info.host, info.port};
} catch (...) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault)));
}
throw std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault));
}
future<shared_ptr<tls::certificate_credentials>> azure_host::impl::make_creds() {

View File

@@ -816,6 +816,7 @@ public:
future<data_sink> wrap_sink(const sstables::sstable& sst, sstables::component_type type, data_sink sink) override {
switch (type) {
case sstables::component_type::Scylla:
case sstables::component_type::TemporaryScylla:
case sstables::component_type::TemporaryTOC:
case sstables::component_type::TOC:
co_return sink;
@@ -844,6 +845,7 @@ public:
sstables::component_type type,
data_source src) override {
switch (type) {
case sstables::component_type::TemporaryScylla:
case sstables::component_type::Scylla:
case sstables::component_type::TemporaryTOC:
case sstables::component_type::TOC:

View File

@@ -36,6 +36,7 @@
#include "encryption_exceptions.hh"
#include "symmetric_key.hh"
#include "utils.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/hash.hh"
#include "utils/loading_cache.hh"
#include "utils/UUID.hh"
@@ -163,6 +164,8 @@ private:
shared_ptr<seastar::tls::certificate_credentials> _creds;
std::unordered_map<bytes, shared_ptr<symmetric_key>> _cache;
bool _initialized = false;
abort_source _as;
};
template<typename T, typename C>
@@ -251,24 +254,50 @@ future<rjson::value> encryption::gcp_host::impl::gcp_auth_post_with_retry(std::s
auto& creds = i->second;
int retries = 0;
static constexpr auto max_retries = 10;
for (;;) {
try {
co_await creds.refresh(KMS_SCOPE, _certs);
} catch (...) {
std::throw_with_nested(permission_error("Error refreshing credentials"));
exponential_backoff_retry exr(10ms, 10000ms);
bool do_backoff = false;
bool did_auth_retry = false;
for (int retry = 0; ; ++retry) {
if (std::exchange(do_backoff, false)) {
co_await exr.retry(_as);
}
bool refreshing = true;
try {
co_await creds.refresh(KMS_SCOPE, _certs);
refreshing = false;
auto res = co_await send_request(uri, _certs, body, httpd::operation_type::POST, key_values({
{ utils::gcp::AUTHORIZATION, utils::gcp::format_bearer(creds.token) },
}));
}), &_as);
co_return res;
} catch (httpd::unexpected_status_error& e) {
gcp_log.debug("{}: Got unexpected response: {}", uri, e.status());
if (e.status() == http::reply::status_type::unauthorized && retries++ < 3) {
// refresh access token and retry.
switch (e.status()) {
default:
if (http::reply::classify_status(e.status()) != http::reply::status_class::server_error) {
break;
}
[[fallthrough]];
case httpclient::reply_status::request_timeout:
if (retry < max_retries) {
// service unavailable etc -> backoff + retry
do_backoff = true;
did_auth_retry = false; // reset this, since we might cause expiration due to backoff (not really, but...)
continue;
}
break;
}
if (refreshing) {
std::throw_with_nested(permission_error("Error refreshing credentials"));
}
if (e.status() == http::reply::status_type::unauthorized && retry < max_retries && !did_auth_retry) {
// refresh access token and retry. no backoff
did_auth_retry = true;
continue;
}
if (e.status() == http::reply::status_type::unauthorized) {
@@ -322,6 +351,7 @@ future<> encryption::gcp_host::impl::init() {
}
future<> encryption::gcp_host::impl::stop() {
_as.request_abort();
co_await _attr_cache.stop();
co_await _id_cache.stop();
}

View File

@@ -38,6 +38,7 @@
#include "utils/loading_cache.hh"
#include "utils/UUID.hh"
#include "utils/UUID_gen.hh"
#include "utils/http.hh"
#include "marshal_exception.hh"
#include "db/config.hh"
@@ -322,17 +323,26 @@ future<> kmip_host::impl::connection::connect() {
f = f.then([this, cred] {
return cred->set_x509_trust_file(_options.truststore, seastar::tls::x509_crt_format::PEM);
});
} else {
f = f.then([cred] {
return cred->set_system_trust();
});
}
return f.then([this, cred] {
// TODO, find if we should do hostname verification
// TODO: connect all failovers already?
auto i = _host.find_last_of(':');
auto name = _host.substr(0, i);
auto port = i != sstring::npos ? std::stoul(_host.substr(i + 1)) : kmip_port;
// Use the URL parser to handle ipv6 etc proper.
// Turn host arg into a URL.
auto info = utils::http::parse_simple_url("kmip://" + _host);
auto name = info.host;
auto port = info.port != 80 ? info.port : kmip_port;
return seastar::net::dns::resolve_name(name).then([this, cred, port](seastar::net::inet_address addr) {
return seastar::tls::connect(cred, seastar::ipv4_addr{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
return seastar::net::dns::resolve_name(name).then([this, cred, port, name](seastar::net::inet_address addr) {
kmip_log.debug("Try connect {}:{}", addr, port);
// TODO: should we verify non-numeric hosts here? (opts.server_name)
// Adding this might break existing users with half-baked certs.
return seastar::tls::connect(cred, seastar::socket_address{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
kmip_log.debug("Successfully connected {}", _host);
// #998 Set keepalive to try avoiding connection going stale in between commands.
s.set_keepalive_parameters(net::tcp_keepalive_params{60s, 60s, 10});

View File

@@ -35,6 +35,7 @@
#include "utils/exponential_backoff_retry.hh"
#include "utils/hash.hh"
#include "utils/loading_cache.hh"
#include "utils/http.hh"
#include "utils/UUID.hh"
#include "utils/UUID_gen.hh"
#include "utils/rjson.hh"
@@ -151,15 +152,10 @@ public:
{
// check if we have an explicit endpoint set.
if (!_options.endpoint.empty()) {
static std::regex simple_url(R"foo((https?):\/\/(?:([\w\.]+)|\[([\w:]+)\]):?(\d+)?\/?)foo");
std::transform(_options.endpoint.begin(), _options.endpoint.end(), _options.endpoint.begin(), ::tolower);
std::smatch m;
if (!std::regex_match(_options.endpoint, m, simple_url)) {
throw std::invalid_argument(fmt::format("Could not parse URL: {}", _options.endpoint));
}
_options.https = m[1].str() == "https";
_options.host = m[2].length() > 0 ? m[2].str() : m[3].str();
_options.port = m[4].length() > 0 ? std::stoi(m[4].str()) : 0;
auto info = utils::http::parse_simple_url(_options.endpoint);
_options.https = info.is_https();
_options.host = info.host;
_options.port = info.port;
}
if (_options.endpoint.empty() && _options.host.empty() && _options.aws_region.empty() && !_options.aws_use_ec2_region) {
throw std::invalid_argument("No AWS region or endpoint specified");

View File

@@ -129,6 +129,6 @@ struct direct_fd_ping_reply {
std::variant<std::monostate, service::wrong_destination, service::group_liveness_info> result;
};
verb [[with_client_info, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply;
verb [[with_client_info, with_timeout, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply;
} // namespace service

View File

@@ -55,6 +55,7 @@ debian_base_packages=(
librapidxml-dev
libcrypto++-dev
libxxhash-dev
zlib1g-dev
slapd
ldap-utils
libcpp-jwt-dev
@@ -117,6 +118,7 @@ fedora_packages=(
makeself
libzstd-static libzstd-devel
lz4-static lz4-devel
zlib-ng-compat-devel
rpm-build
devscripts
debhelper

View File

@@ -157,6 +157,7 @@ adjust_bin() {
export GNUTLS_SYSTEM_PRIORITY_FILE="\${GNUTLS_SYSTEM_PRIORITY_FILE-$prefix/libreloc/gnutls.config}"
export LD_LIBRARY_PATH="$prefix/libreloc"
export UBSAN_OPTIONS="${UBSAN_OPTIONS:+$UBSAN_OPTIONS:}suppressions=$prefix/libexec/ubsan-suppressions.supp"
${p11_trust_paths:+export SCYLLA_P11_TRUST_PATHS="$p11_trust_paths"}
exec -a "\$0" "$prefix/libexec/$bin" "\$@"
EOF
chmod 755 "$root/$prefix/bin/$bin"
@@ -330,7 +331,6 @@ if ! $nonroot; then
rsysconfdir=$(realpath -m "$root/$sysconfdir")
rusr=$(realpath -m "$root/usr")
rsystemd=$(realpath -m "$rusr/lib/systemd/system")
rshare="$rprefix/share"
rdoc="$rprefix/share/doc"
rdata=$(realpath -m "$root/var/lib/scylla")
rhkdata=$(realpath -m "$root/var/lib/scylla-housekeeping")
@@ -338,7 +338,6 @@ else
retc="$rprefix/etc"
rsysconfdir="$rprefix/$sysconfdir"
rsystemd="$HOME/.config/systemd/user"
rshare="$rprefix/share"
rdoc="$rprefix/share/doc"
rdata="$rprefix"
fi
@@ -522,16 +521,6 @@ PRODUCT="$product"
EOS
chmod 644 "$rprefix"/scripts/scylla_product.py
install -d -m755 "$rshare"/p11-kit/modules
cat << EOS > "$rshare"/p11-kit/modules/p11-kit-trust.module
module: $prefix/libreloc/pkcs11/p11-kit-trust.so
priority: 1
trust-policy: yes
x-trust-lookup: pkcs11:library-description=PKCS%2311%20Kit%20Trust%20Module
disable-in: p11-kit-proxy
x-init-reserved: paths=$p11_trust_paths
EOS
if ! $nonroot && ! $without_systemd; then
install -d -m755 "$retc"/systemd/system/scylla-server.service.d
install -m644 dist/common/systemd/scylla-server.service.d/dependencies.conf -Dt "$retc"/systemd/system/scylla-server.service.d

61
main.cc
View File

@@ -10,6 +10,8 @@
#include <functional>
#include <fmt/ranges.h>
#include <gnutls/pkcs11.h>
#include <seastar/util/closeable.hh>
#include <seastar/core/abort_source.hh>
#include "db/view/view_building_worker.hh"
@@ -37,7 +39,6 @@
#include "api/api_init.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "db/legacy_schema_migrator.hh"
#include "service/storage_service.hh"
#include "service/migration_manager.hh"
#include "service/tablet_allocator.hh"
@@ -118,15 +119,11 @@
#include "message/dictionary_service.hh"
#include "sstable_dict_autotrainer.hh"
#include "utils/disk_space_monitor.hh"
#include "auth/cache.hh"
#include "utils/labels.hh"
#include "tools/utils.hh"
#define P11_KIT_FUTURE_UNSTABLE_API
extern "C" {
#include <p11-kit/p11-kit.h>
}
namespace fs = std::filesystem;
#include <seastar/core/metrics_api.hh>
#include <seastar/core/relabel_config.hh>
@@ -708,14 +705,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
print_starting_message(ac, av, parsed_opts);
}
// We have to override p11-kit config path before p11-kit initialization.
// And the initialization will invoke on seastar initialization, so it has to
// be before app.run()
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe"));
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
auto p11_modules_str = p11_modules.string<char>();
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
sharded<locator::shared_token_metadata> token_metadata;
sharded<locator::effective_replication_map_factory> erm_factory;
sharded<service::migration_notifier> mm_notifier;
@@ -727,6 +716,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
seastar::sharded<service::cache_hitrate_calculator> cf_cache_hitrate_calculator;
service::load_meter load_meter;
sharded<service::storage_proxy> proxy;
sharded<auth::cache> auth_cache;
sharded<service::storage_service> ss;
sharded<service::migration_manager> mm;
sharded<tasks::task_manager> task_manager;
@@ -789,7 +779,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
return seastar::async([&app, cfg, ext, &disk_space_monitor_shard0, &cm, &sstm, &db, &qp, &bm, &proxy, &mapreduce_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service, &gossiper, &snitch,
&token_metadata, &erm_factory, &snapshot_ctl, &messaging, &sst_dir_semaphore, &raft_gr, &service_memory_limiter,
&repair, &sst_loader, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
&repair, &sst_loader, &auth_cache, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
&hashing_worker, &vector_store_client] {
try {
if (opts.contains("relabel-config-file") && !opts["relabel-config-file"].as<sstring>().empty()) {
@@ -1650,7 +1640,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
fd.start(
std::ref(fd_pinger), std::ref(fd_clock),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count(),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count()).get();
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count(), dbcfg.gossip_scheduling_group).get();
auto stop_fd = defer_verbose_shutdown("direct_failure_detector", [] {
fd.stop().get();
@@ -1802,6 +1792,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
api::unset_server_stream_manager(ctx).get();
});
checkpoint(stop_signal, "starting auth cache");
auth_cache.start(std::ref(qp)).get();
auto stop_auth_cache = defer_verbose_shutdown("auth cache", [&] {
auth_cache.stop().get();
});
checkpoint(stop_signal, "initializing storage service");
debug::the_storage_service = &ss;
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
@@ -1810,6 +1806,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
std::ref(messaging), std::ref(repair),
std::ref(stream_manager), std::ref(lifecycle_notifier), std::ref(bm), std::ref(snitch),
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(view_builder), std::ref(view_building_worker), std::ref(qp), std::ref(sl_controller),
std::ref(auth_cache),
std::ref(tsm), std::ref(vbsm), std::ref(task_manager), std::ref(gossip_address_map),
compression_dict_updated_callback,
only_on_shard0(&*disk_space_monitor_shard0)
@@ -1825,11 +1822,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
ss.stop().get();
});
api::set_server_storage_service(ctx, ss, group0_client).get();
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
api::unset_server_storage_service(ctx).get();
});
checkpoint(stop_signal, "initializing query processor remote part");
// TODO: do this together with proxy.start_remote(...)
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(mapreduce_service),
@@ -1858,8 +1850,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
group0_client.init().get();
checkpoint(stop_signal, "initializing system schema");
// schema migration, if needed, is also done on shard 0
db::legacy_schema_migrator::migrate(proxy, db, sys_ks, qp.local()).get();
db::schema_tables::save_system_schema(qp.local()).get();
db::schema_tables::recalculate_schema_version(sys_ks, proxy, feature_service.local()).get();
@@ -2070,7 +2060,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
maintenance_auth_config.authenticator_java_name = sstring{auth::allow_all_authenticator_name};
maintenance_auth_config.role_manager_java_name = sstring{auth::maintenance_socket_role_manager_name};
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(hashing_worker)).get();
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache), std::ref(hashing_worker)).get();
cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group);
@@ -2184,6 +2174,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// This will also disable migration manager schema pulls if needed.
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local()).get();
api::set_server_storage_service(ctx, ss, group0_client).get();
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
api::unset_server_storage_service(ctx).get();
});
with_scheduling_group(maintenance_scheduling_group, [&] {
return messaging.invoke_on_all([&] (auto& ms) {
return ms.start_listen(token_metadata.local(), [&gossiper] (gms::inet_address ip) {
@@ -2341,7 +2336,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auth_config.authenticator_java_name = qualified_authenticator_name;
auth_config.role_manager_java_name = qualified_role_manager_name;
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(hashing_worker)).get();
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache), std::ref(hashing_worker)).get();
std::any stop_auth_service;
// Has to be called after node joined the cluster (join_cluster())
@@ -2687,13 +2682,15 @@ int main(int ac, char** av) {
// #3583 - need to potentially ensure this for tools as well, since at least
// sstable* might need crypto libraries.
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe")); // could just be argv[0] I guess...
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
// Note: must be in scope for application lifetime. p11_kit_override_system_files does _not_
// copy input strings.
auto p11_modules_str = p11_modules.string<char>();
// #3392 only do this if we are actually packaged and the path exists.
if (fs::exists(p11_modules)) {
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
auto p11_trust_paths_from_env = std::getenv("SCYLLA_P11_TRUST_PATHS");
auto trust_module_path = scylla_path.parent_path().parent_path().append("libreloc/pkcs11/p11-kit-trust.so");
if (fs::exists(trust_module_path) && p11_trust_paths_from_env) {
gnutls_pkcs11_init(GNUTLS_PKCS11_FLAG_MANUAL, nullptr);
auto trust_config = fmt::format("p11-kit:paths={} trusted=yes", p11_trust_paths_from_env);
auto ret = gnutls_pkcs11_add_provider(trust_module_path.string().c_str(), trust_config.c_str());
if (ret != GNUTLS_E_SUCCESS) {
startlog.warn("Could not initialize p11-kit trust module: {}\n", gnutls_strerror(ret));
}
}
return main_func(ac, av);

View File

@@ -686,6 +686,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_MODIFY_CONFIG:
case messaging_verb::RAFT_PULL_SNAPSHOT:
case messaging_verb::NOTIFY_BANNED:
case messaging_verb::DIRECT_FD_PING:
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
// only verbs which are 'rare' and 'cheap'.
@@ -747,7 +748,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::PAXOS_ACCEPT:
case messaging_verb::PAXOS_LEARN:
case messaging_verb::PAXOS_PRUNE:
case messaging_verb::DIRECT_FD_PING:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:

View File

@@ -575,10 +575,15 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
}
res.row.set_range_tombstone(cur.range_tombstone_for_row() + src_cur.range_tombstone());
if (need_preempt()) {
lb = position_in_partition(cur.position());
++tracker.get_stats().rows_covered_by_range_tombstones_from_memtable;
return stop_iteration::no;
}
// FIXME: Compact the row
++tracker.get_stats().rows_covered_by_range_tombstones_from_memtable;
cur.next();
// FIXME: preempt
}
}
{

View File

@@ -14,6 +14,7 @@
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/error_injection.hh"
#include "utils/UUID_gen.hh"
#include <variant>
#include "utils/overloaded_functor.hh"
@@ -90,6 +91,7 @@ future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status_help
.scope = "cluster",
.state = get_state(entry),
.is_abortable = co_await is_abortable(std::move(hint)),
.creation_time = db_clock::time_point(utils::UUID_gen::unix_timestamp(id.uuid())),
.start_time = entry.start_time,
.end_time = entry.end_time,
.error = entry.error,
@@ -167,6 +169,7 @@ future<std::vector<tasks::task_stats>> node_ops_virtual_task::get_stats() {
.table = "",
.entity = "",
.shard = 0,
.creation_time = db_clock::time_point(utils::UUID_gen::unix_timestamp(id)),
.start_time = entry.start_time,
.end_time = entry.end_time
};

View File

@@ -2329,11 +2329,7 @@ future<gc_clock::time_point> repair_service::repair_tablet(gms::gossip_address_m
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now()- start);
rlogger.info("repair[{}]: Finished tablet repair for table={}.{} range={} duration={} replicas={} global_tablet_id={} flush_time={}",
id.uuid(), keyspace_name, table_name, range, duration, replicas, gid, flush_time);
if (!flush_time.has_value()) {
throw std::runtime_error(format("Batchlog reply failed for table={}.{} range={} replicas={} global_tablet_id={}",
id.uuid(), keyspace_name, table_name, range, replicas, gid));
}
co_return flush_time.value();
co_return flush_time;
}
tasks::is_user_task repair::tablet_repair_task_impl::is_user_task() const noexcept {
@@ -2410,9 +2406,11 @@ future<> repair::tablet_repair_task_impl::run() {
});
auto parent_shard = this_shard_id();
std::vector<std::optional<gc_clock::time_point>> flush_times(smp::count, gc_clock::time_point{});
rs.container().invoke_on_all([&idx, &flush_times, id, metas = _metas, parent_data, reason = _reason, tables = _tables, sched_info = sched_info, ranges_parallelism = _ranges_parallelism, parent_shard, topo_guard = _topo_guard, skip_flush = _skip_flush] (repair_service& rs) -> future<> {
auto flush_time = _flush_time;
auto res = rs.container().map_reduce0([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables, sched_info = sched_info, ranges_parallelism = _ranges_parallelism, parent_shard, topo_guard = _topo_guard, skip_flush = _skip_flush] (repair_service& rs) -> future<std::pair<gc_clock::time_point, bool>> {
std::exception_ptr error;
gc_clock::time_point shard_flush_time;
bool flush_failed = false;
for (auto& m : metas) {
if (m.master_shard_id != this_shard_id()) {
continue;
@@ -2466,27 +2464,24 @@ future<> repair::tablet_repair_task_impl::run() {
error = std::move(ep);
}
}
auto current = flush_times[this_shard_id()];
if ((needs_flush_before_repair &&!hints_batchlog_flushed) || !current.has_value()) {
flush_times[this_shard_id()] = std::nullopt;
} else {
auto time = task->get_flush_time();
flush_times[this_shard_id()] = current == gc_clock::time_point() ? time : std::min(current.value(), time);
}
auto time = task->get_flush_time();
shard_flush_time = shard_flush_time == gc_clock::time_point() ? time : std::min(shard_flush_time, time);
flush_failed = flush_failed || (needs_flush_before_repair && !hints_batchlog_flushed);
}
if (error) {
co_await coroutine::return_exception_ptr(std::move(error));
}
co_return std::make_pair(shard_flush_time, flush_failed);
}, std::make_pair<gc_clock::time_point, bool>(std::move(flush_time), false), [] (const auto& p1, const auto& p2) {
auto& [time1, failed1] = p1;
auto& [time2, failed2] = p2;
auto flush_time = time1 == gc_clock::time_point() ? time2 :
(time2 == gc_clock::time_point() ? time1 : std::min(time1, time2));
auto failed = failed1 || failed2;
return std::make_pair(flush_time, failed);
}).get();
for (auto& time : flush_times) {
if (!time.has_value()) {
_flush_time = std::nullopt;
break;
}
if (time != gc_clock::time_point()) {
_flush_time = _flush_time == gc_clock::time_point() ? time : std::min(_flush_time.value(), time.value());
}
}
_flush_time = res.first;
_should_flush_and_flush_failed = res.second;
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start_time);
rlogger.info("repair[{}]: Finished user-requested repair for tablet keyspace={} tables={} repair_id={} tablets_repaired={} duration={}",
id.uuid(), _keyspace, _tables, id.id, _metas.size(), duration);

View File

@@ -2529,7 +2529,7 @@ future<repair_update_system_table_response> repair_service::repair_update_system
}
}
if (req.range.end()) {
if (!req.range.end()->is_inclusive()) {
if (!req.range.end()->is_inclusive() && req.range.end()->value() != dht::maximum_token()) {
is_valid_range = false;
}
}

View File

@@ -112,7 +112,8 @@ private:
optimized_optional<abort_source::subscription> _abort_subscription;
std::optional<int> _ranges_parallelism;
size_t _metas_size = 0;
std::optional<gc_clock::time_point> _flush_time = gc_clock::time_point();
gc_clock::time_point _flush_time = gc_clock::time_point();
bool _should_flush_and_flush_failed = false;
service::frozen_topology_guard _topo_guard;
bool _skip_flush;
public:
@@ -134,7 +135,12 @@ public:
return tasks::is_abortable(!_abort_subscription);
}
std::optional<gc_clock::time_point> get_flush_time() const { return _flush_time; }
gc_clock::time_point get_flush_time() const {
if (_should_flush_and_flush_failed) {
throw std::runtime_error(fmt::format("Flush is needed for repair {} with parent {}, but failed", id(), _parent_id));
}
return _flush_time;
}
tasks::is_user_task is_user_task() const noexcept override;
virtual future<> release_resources() noexcept override;

View File

@@ -3704,7 +3704,7 @@ future<utils::chunked_vector<temporary_buffer<char>>> database::sample_data_file
}), std::ref(state));
// [1, 2, 3, 0] --> [0, 1, 3, 6]
std::exclusive_scan(global_offset.begin(), global_offset.end(), global_offset.begin(), 0, std::plus());
std::exclusive_scan(global_offset.begin(), global_offset.end(), global_offset.begin(), uint64_t(0), std::plus());
// We can't generate random non-negative integers smaller than 0,
// so let's just deal with the `total_chunks == 0` case with an early return.

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "seastar/core/scheduling.hh"
#include "utils/assert.hh"
#include <unordered_set>
@@ -17,6 +18,7 @@
#include <seastar/core/condition-variable.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/switch_to.hh>
#include "utils/log.hh"
@@ -118,7 +120,7 @@ struct failure_detector::impl {
// Fetches endpoint updates from _endpoint_queue and performs the add/remove operation.
// Runs on shard 0 only.
future<> update_endpoint_fiber();
future<> update_endpoint_fiber(seastar::scheduling_group sg);
future<> _update_endpoint_fiber = make_ready_future<>();
// Workers running on this shard.
@@ -140,7 +142,7 @@ struct failure_detector::impl {
// The unregistering process requires cross-shard operations which we perform on this fiber.
future<> _destroy_subscriptions = make_ready_future<>();
impl(failure_detector& parent, pinger&, clock&, clock::interval_t ping_period, clock::interval_t ping_timeout);
impl(failure_detector& parent, pinger&, clock&, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg);
~impl();
// Inform update_endpoint_fiber() about an added/removed endpoint.
@@ -177,19 +179,19 @@ struct failure_detector::impl {
};
failure_detector::failure_detector(
pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout)
: _impl(std::make_unique<impl>(*this, pinger, clock, ping_period, ping_timeout))
pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg)
: _impl(std::make_unique<impl>(*this, pinger, clock, ping_period, ping_timeout, sg))
{}
failure_detector::impl::impl(
failure_detector& parent, pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout)
failure_detector& parent, pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg)
: _parent(parent), _pinger(pinger), _clock(clock), _ping_period(ping_period), _ping_timeout(ping_timeout) {
if (this_shard_id() != 0) {
return;
}
_num_workers.resize(smp::count, 0);
_update_endpoint_fiber = update_endpoint_fiber();
_update_endpoint_fiber = update_endpoint_fiber(sg);
}
void failure_detector::impl::send_update_endpoint(pinger::endpoint_id ep, endpoint_update update) {
@@ -205,9 +207,9 @@ void failure_detector::impl::send_update_endpoint(pinger::endpoint_id ep, endpoi
_endpoint_changed.signal();
}
future<> failure_detector::impl::update_endpoint_fiber() {
future<> failure_detector::impl::update_endpoint_fiber(seastar::scheduling_group sg) {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await coroutine::switch_to(sg);
while (true) {
co_await _endpoint_changed.wait([this] { return !_endpoint_updates.empty(); });
@@ -480,7 +482,7 @@ static future<bool> ping_with_timeout(pinger::endpoint_id id, clock::timepoint_t
}
});
auto f = pinger.ping(id, timeout_as);
auto f = pinger.ping(id, timeout, timeout_as, c);
auto sleep_and_abort = [] (clock::timepoint_t timeout, abort_source& timeout_as, clock& c) -> future<> {
co_await c.sleep_until(timeout, timeout_as).then_wrapped([&timeout_as] (auto&& f) {
// Avoid throwing if sleep was aborted.

View File

@@ -19,26 +19,6 @@ class abort_source;
namespace direct_failure_detector {
class pinger {
public:
// Opaque endpoint ID.
// A specific implementation of `pinger` maps those IDs to 'real' addresses.
using endpoint_id = utils::UUID;
// Send a message to `ep` and wait until it responds.
// The wait can be aborted using `as`.
// Abort should be signalized with `abort_requested_exception`.
//
// If the ping fails in an expected way (e.g. the endpoint is down and refuses to connect),
// returns `false`. If it succeeds, returns `true`.
virtual future<bool> ping(endpoint_id ep, abort_source& as) = 0;
protected:
// The `pinger` object must not be destroyed through the `pinger` interface.
// `failure_detector` does not take ownership of `pinger`, only a non-owning reference.
~pinger() = default;
};
// A clock that uses abstract units to measure time.
// The implementation is responsible for periodically advancing the clock.
//
@@ -60,12 +40,33 @@ public:
// Aborts should be signalized using `seastar::sleep_aborted`.
virtual future<> sleep_until(timepoint_t tp, abort_source& as) = 0;
virtual std::chrono::milliseconds to_milliseconds(timepoint_t tp) const = 0;
protected:
// The `clock` object must not be destroyed through the `clock` interface.
// `failure_detector` does not take ownership of `clock`, only a non-owning reference.
~clock() = default;
};
class pinger {
public:
// Opaque endpoint ID.
// A specific implementation of `pinger` maps those IDs to 'real' addresses.
using endpoint_id = utils::UUID;
// Send a message to `ep` and wait until it responds.
// The wait can be aborted using `as`.
// Abort should be signalized with `abort_requested_exception`.
//
// If the ping fails in an expected way (e.g. the endpoint is down and refuses to connect),
// returns `false`. If it succeeds, returns `true`.
virtual future<bool> ping(endpoint_id ep, clock::timepoint_t timeout, abort_source& as, clock& c) = 0;
protected:
// The `pinger` object must not be destroyed through the `pinger` interface.
// `failure_detector` does not take ownership of `pinger`, only a non-owning reference.
~pinger() = default;
};
class listener {
public:
// Called when an endpoint in the detected set (added by `failure_detector::add_endpoint`) responds to a ping
@@ -127,7 +128,10 @@ public:
// Duration after which a ping is aborted, so that next ping can be started
// (pings are sent sequentially).
clock::interval_t ping_timeout
clock::interval_t ping_timeout,
// Scheduling group used for fibers inside the failure detector.
seastar::scheduling_group sg
);
~failure_detector();

View File

@@ -301,6 +301,7 @@ protected:
class ghost_row_deleting_query_pager : public service::pager::query_pager {
service::storage_proxy& _proxy;
db::timeout_clock::duration _timeout_duration;
size_t _concurrency;
public:
ghost_row_deleting_query_pager(schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
service::query_state& state,
@@ -309,10 +310,12 @@ public:
dht::partition_range_vector ranges,
cql3::cql_stats& stats,
service::storage_proxy& proxy,
db::timeout_clock::duration timeout_duration)
db::timeout_clock::duration timeout_duration,
size_t concurrency)
: query_pager(proxy, s, selection, state, options, std::move(cmd), std::move(ranges), std::nullopt)
, _proxy(proxy)
, _timeout_duration(timeout_duration)
, _concurrency(concurrency)
{}
virtual ~ghost_row_deleting_query_pager() {}
@@ -322,8 +325,12 @@ public:
_query_read_repair_decision = qr.read_repair_decision;
qr.query_result->ensure_counts();
return seastar::async([this, query_result = std::move(qr.query_result), page_size, now] () mutable -> result<> {
handle_result(db::view::delete_ghost_rows_visitor{_proxy, _state, view_ptr(_query_schema), _timeout_duration},
std::exception_ptr ex;
handle_result(db::view::delete_ghost_rows_visitor{_proxy, _state, view_ptr(_query_schema), _timeout_duration, _concurrency, ex},
std::move(query_result), page_size, now);
if (ex) {
std::rethrow_exception(ex);
}
return bo::success();
});
}));
@@ -503,7 +510,8 @@ std::unique_ptr<service::pager::query_pager> service::pager::query_pagers::pager
dht::partition_range_vector ranges,
cql3::cql_stats& stats,
storage_proxy& proxy,
db::timeout_clock::duration duration) {
db::timeout_clock::duration duration,
size_t concurrency) {
return ::make_shared<ghost_row_deleting_query_pager>(std::move(s), std::move(selection), state,
options, std::move(cmd), std::move(ranges), stats, proxy, duration);
options, std::move(cmd), std::move(ranges), stats, proxy, duration, concurrency);
}

View File

@@ -47,7 +47,8 @@ public:
dht::partition_range_vector,
cql3::cql_stats& stats,
storage_proxy& proxy,
db::timeout_clock::duration timeout_duration);
db::timeout_clock::duration timeout_duration,
size_t concurrency);
};
}

Some files were not shown because too many files have changed in this diff Show More