Compare commits

...

38 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
98fafb25b2 Address code review comments: improve documentation and exception handling
- Add detailed comments explaining leaf depth calculation
- Document prefix encoding format (length in lower 7 bits, value in upper bits)
- Replace bare except clauses with specific exception types
- Catch only relevant exceptions (gdb.error, MemoryError, ValueError, AttributeError)

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-08 22:08:05 +00:00
copilot-swe-agent[bot]
b17de07c43 Enhance compact_radix_tree wrapper with better documentation and error handling
- Add comprehensive usage examples in docstring
- Improve error messages for optimized builds
- Document limitations and workarounds
- Show tree size and layout info when elements not accessible
- Provide guidance for users encountering limitations

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-08 22:06:05 +00:00
copilot-swe-agent[bot]
4b7f760a38 Implement compact_radix_tree wrapper with std_map-like API
Add wrapper class for compact_radix_tree that provides:
- Iteration over elements (__iter__)
- Indexing by column id (__getitem__)
- Dictionary-like methods (keys, values, items, get)
- Length support (__len__)

Note: Full tree traversal is limited by compiler optimizations
and GDB's inability to call C++ template methods directly.
The implementation provides the API framework with best-effort
element collection.

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-08 22:04:15 +00:00
copilot-swe-agent[bot]
c824803a24 Initial plan 2025-12-08 21:56:45 +00:00
Amnon Heiman
a213e41250 scylla-node-exporter: Add ethtool to node exporter
AWS suggests following multiple network performance metrics:
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/monitoring-network-performance-ena.html#network-performance-metrics

This patch enables the ethtool collector with the specific list of
metrics

Ater this patch the relevant metris looks like:

$ curl http://localhost:9100/metrics |& grep ethtool
node_ethtool_bw_in_allowance_exceeded{device="ens5"} 0
node_ethtool_bw_out_allowance_exceeded{device="ens5"} 0
node_ethtool_conntrack_allowance_available{device="ens5"} 51303
node_ethtool_conntrack_allowance_exceeded{device="ens5"} 0
node_ethtool_info{bus_info="0000:00:05.0",device="ens5",driver="ena",expansion_rom_version="",firmware_version="",version="6.14.0-1015-aws"} 1
node_ethtool_linklocal_allowance_exceeded{device="ens5"} 0
node_scrape_collector_duration_seconds{collector="ethtool"} 0.001091436
node_scrape_collector_success{collector="ethtool"} 1

Signed-off-by: Amnon Heiman <amnon@scylladb.com>

Closes scylladb/scylladb#27358
2025-12-08 14:27:10 +02:00
Dawid Mędrek
58dc414912 test/cluster/mv: Rewrite test_view_building_scheduling_group
We rewrite the test to avoid flakiness. Instead of looking at the
metrics, we make a trade-off and start depending on a less reliable
mechanism -- logs. We grep all relevant messages printed by Scylla
in TRACE mode and make sure that they were all printed from a context
using the streaming scheduling group.

Although it's a "less proper" way of testing, it should be much more
dependable and avoid flakiness.

Fixes scylladb/scylladb#25957

Closes scylladb/scylladb#26656
2025-12-08 14:24:25 +02:00
Ferenc Szili
d883ff2317 test: fix flakyness caused by TRUNCATE retries
The test test_truncate_during_topology_change tests TRUNCATE TABLE while
bootstrapping a new node. With tablets enabled TRUNCATE is a global
topology operation which needs to serialize with boostrap.

When TRUNCATE TABLE is issued, it first checks if there is an already
queued truncate for the same table. This can happen if a previous
TRUNCATE operation has timed out, and the client retried. The newly
issued truncate will only join the queued one if it is waiting to be
processed, and will fail immediatelly if the TRUNCATE is already being
processed.

In this test, TRUNCATE will be retried after a timeout (1 minute) due to
the default retry policy, and will be retried up to 3 times, while the
bootstrap is delayed by 2 minutes. This means that the test can validate
the result of a truncate which was started after bootstrap was
completed.

Because of the way truncate joins existing truncate operations, we can
also have the following scenario:
- TRUNCATE times out after one minute because the new node is being
  bootstrapped
- the client retries the TRUNCATE command which also times out after 1m
- the third attempt is received during TRUNCATE being processed which
  fails the test

This patch changes the retry policy of the TRUNCATE operation to
FallthroughRetryPolicy which guarantees that TRUNCATE will not be
retried on timeout. It also increases the timeout of the TRUNCATE from 1
to 4 minutes. This way the test will actually validate the performance
of the TRUNCATE operation which was issued during bootstrap, instead of
the subsequent, retried TRUNCATEs which could have been issued after the
bootstrap was complete.

Fixes: #26347

Closes scylladb/scylladb#27245
2025-12-08 14:13:26 +02:00
dependabot[bot]
1f777da863 build(deps): bump sphinx-scylladb-theme from 1.8.9 to 1.8.10 in /docs
Bumps [sphinx-scylladb-theme](https://github.com/scylladb/sphinx-scylladb-theme) from 1.8.9 to 1.8.10.
- [Release notes](https://github.com/scylladb/sphinx-scylladb-theme/releases)
- [Commits](https://github.com/scylladb/sphinx-scylladb-theme/commits)

---
updated-dependencies:
- dependency-name: sphinx-scylladb-theme
  dependency-version: 1.8.10
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Closes scylladb/scylladb#27468
2025-12-08 13:40:51 +02:00
Asias He
faad0167d7 repair: Add tablet repair progress report support
This patch adds tablet repair progress report support so that the user
could use the /task_manager/task_status API to query the progress.

In order to support this, a new system table is introduced to record the
user request related info, i.e, start of the request and end of the
request.

The progress is accurate when tablet split or merge happens in the
middle of the request, since the tokens of the tablet are recorded when
the request is started and when repair of each tablet is finished. The
original tablet repair is considered as finished when the finished
ranges cover the original tablet token ranges.

After this patch, the /task_manager/task_status API will report correct
progress_total and progress_completed.

Fixes #22564
Fixes #26896

Closes scylladb/scylladb#26924
2025-12-08 13:35:19 +02:00
Andrei Chekun
0115a21b9a test.py: fail test when timeout reached for boost test
There is a bug in current pytest's boost implementation. When timeout
reached process will be killed, but it was not correctly propagated,
that lead to a false positive result. This will fail test case when
timeout for the process is reached.
This is to prevent issues like this https://github.com/scylladb/scylladb/issues/27237

Closes scylladb/scylladb#27463
2025-12-08 11:49:46 +01:00
Pavel Emelyanov
8192f45e84 Merge 'Add option to use sstable identifier in snapshot' from Benny Halevy
This change adds a new option to the REST api and correspondingly, to scylla nodetool: use_sstable_identifier.
When set, we use the sstable identifier, if available, to name each sstable in the snapshots directory
and the manifest.json file, rather than using the sstable generation.

This can be used by the user (e.g. Scylla Manager) for global deduplication with tablets, where an sstable
may be migrated across shards or across nodes, and in this case, its generation may change, but its
sstable identifier remains sstable.

Currently, Scylla manager uses the sstable generation to detect sstables that are already backed up to
object storage and exist in previous backed up snapshots.
Historically, the sstable generation was guaranteed to be unique only per table per node,
so the dedup code currently checks for deduplication in the node scope.

However, with tablet migration, sstables are renamed when migrated to a different shard,
i.e. their generation changes, and they may be renamed when migrated to another node,
but even if they are not, the dedup logic still assumes uniqueness only within a node.

To address both cases, we keep the sstable_id stable throughout the sstable life cycle (since 3a12ad96c7).
Given the globally unique sstable identifier, scylla manager can now detect duplicate sstables
in a wider scope.  This can be cluster-wide, but we practically need only rack-wide deduplication
or dc-wide, as tablets are migrated across racks only in rare occasions (like when converting from a
numerical replication factor to a rack list containing a subset of the available racks in a datacenter).

Fixes #27181

* New feature, no backport required

Closes scylladb/scylladb#27184

* github.com:scylladb/scylladb:
  database: truncate_table_on_all_shards: set use_sstable_identifier to true
  nodetool: snapshot: add --use-sstable-identifier option
  api: storage_service: take_snapshot: add use_sstable_identifier option
  test: database_test: add snapshot_use_sstable_identifier_works
  test: database_test: snapshot_works: add validate_manifest
  sstable: write_scylla_metadata: add random_sstable_identifier error injection
  table: snapshot_on_all_shards: take snapshot_options
  sstable: add get_format getter
  sstable: snapshot: add use_sstable_identifier option
  db: snapshot_ctl: snapshot_options: add use_sstable_identifier options
  db: snapshot_ctl: move skip_flush to struct snapshot_options
2025-12-08 12:56:12 +03:00
Avi Kivity
45c16553eb Revert "Update tools/cqlsh submodule"
This reverts commit ff1b212319. In this
commit, the python driver was updated to 3.29.6. That version has a
serious flaw - it rejects compression=None settings [1] which
cqlsh (legitimately) uses in copyutil.py.

The reason this hasn't caused numerous continuous integration failures
is that the submodule update commit did not update the frozen toolchain,
so the build was effectively running with an older version of the driver.

Fix by reverting the change. This allows us to regenerate the frozen
toolchain when we need to.

Reverted changes:

* tools/cqlsh 2240122...6badc99 (2):
  > Update scylla-driver version to 3.29.6
  > Revert "Migrate workflows to Blacksmith"

[1] 78f554236f

Closes scylladb/scylladb#27473
2025-12-08 08:50:52 +02:00
Nadav Har'El
c984f557ef Merge 'alternator: eliminate cross shard ::free for do_batch_write' from Petr Gusev
This is an optimization follow-up [for this PR](https://github.com/scylladb/scylladb/pull/27396#issuecomment-3611410774): avoiding destruction of foreign objects on the wrong shard. Releasing objects allocated on a different shard causes their ::free calls to be executed remotely, which adds unnecessary load to the SMP subsystem.

Before this PR, a `std::vector<put_or_delete_item>` could be moved to another shard. When the vector was eventually destroyed, its ::free had to be marshalled back to the shard where the memory had originally been allocated. This change avoids that overhead by passing the vector by const reference instead.

backport: not needed, this is an optimization

Closes scylladb/scylladb#27432

* github.com:scylladb/scylladb:
  alternator/executor.cc: avoid cross-shard free
  storage_proxy: cas: take cas_request by raw reference
2025-12-07 22:54:36 +02:00
Andrei Chekun
5e83311305 test.py: switch to ThreadPoolExecutor
With python 3.14, the Process fails due to pickling issue with nodes objects.
This will eliminate this issue, so we can bump up the python version.

Closes scylladb/scylladb#27456
2025-12-07 17:37:25 +02:00
Petr Gusev
f00f7976c1 alternator/executor.cc: avoid cross-shard free
This commit is an optimization: avoiding destruction of
foreign objects on the wrong shard. Releasing objects allocated on a
different shard causes their ::free calls to be executed remotely,
which adds unnecessary load to the SMP subsystem.

Before this patch, a std::vector could be moved
to another shard. When the vector was eventually destroyed,
its ::free had to be marshalled back to the shard where the memory had
originally been allocated. This change avoids that overhead by passing
the vector by const reference instead.

The referenced objects lifetime correctness reasoning:
* the put_or_delete_item refs usages in put_or_delete_item_cas_request
are bound to its lifetime
* cas_request lifetime is bound to storage_proxy::cas future
* we don't release put_or_delete_item-s untill all storage_proxy::cas
calls are done.
2025-12-07 16:14:56 +01:00
Petr Gusev
c428645d16 storage_proxy: cas: take cas_request by raw reference
In the next commit we want to add an optimization that relies on
precise control over the lifetime of cas_request. In particular, we
want the implementation of this interface in Alternator to operate on
raw references that are guaranteed to remain valid only until the
cas() future is resolved. We already depend on the same lifetime
assumptions in cas_request when used by modification_statement.
However, these assumptions are not clearly expressed in the current
interface: cas_request is taken by shared_ptr, and nothing prevents
cas() from storing that pointer inside paxos_response_handler, which
may outlive the cas() future.

This commit fixes that by taking cas_request by raw reference. This
makes it explicit that cas() does not assume ownership of the object.
Callers must ensure that the referenced object remains valid until
the returned future is resolved.
2025-12-07 16:14:56 +01:00
Tomasz Grabiec
082342ecad Attach names to allocating sections for better debuggability
Large reserves in allocating_section can cause stalls. We already log
reserve increase, but we don't know which table it belongs to:

  lsa - LSA allocation failure, increasing reserve in section 0x600009f94590 to 128 segments;

Allocating sections used for updating row cache on memtable flush are
notoriously problematic. Each table has its own row_cache, so its own
allocating_section(s). If we attached table name to those sections, we
could identify which table is causing problems. In some issues we
suspected system.raft, but we can't be sure.

This patch allows naming allocating_sections for the purpose of
identifying them in such log messages. I use abstract_formatter for
this purpose to avoid the cost of formatting strings on the hot path
(e.g. index_reader). And also to avoid duplicating strings which are
already stored elsewhere.

Fixes #25799

Closes scylladb/scylladb#27470
2025-12-07 14:14:25 +02:00
Avi Kivity
47efbdffbc Merge 'cache, mvcc: Preempt cache update when applying range tombstone from memtable' from Tomasz Grabiec
Range tombstones are represented as entry attributes, which applies to
the interval between entries. So if a range tombstone covers many
rows, to apply it we have to update all covered entries.  In some
workloads that could be many entries, even the whole cache.  Before
the patch, we did this update without preemption, which can cause
reactor stalls in such workloads.

This scenario is already covered by mvcc_tests,
e.g. test_apply_to_incomplete_respects_continuity. And I verified that
the new preemption point is hit in the test.

perf-row-cache-update results show no significant stalls anymore (max
2ms scheduling delay, instead of previous 1.5 s):

    Generated 1124195 rows
    Memtable fill took 4179.457520 [ms], {count: 8295, 99%: 0.654949 [ms], max: 32.817176 [ms]}
    Draining...
    took 0.000616 [ms]
    cache: 2506/2948 [MB], memtable: 781/1024 [MB], alloc/comp: 1051/662 [MB] (amp: 0.630)
    update: 2874.157471 [ms], preemption: {count: 26650, 99%: 1.131752 [ms], max: 2.068762 [ms]}, cache: 3027/3973 [MB], alloc/comp: 3951/2424 [MB] (amp: 0.614), pr/me/dr 1124195/0/0

Fixes #23479
Fixes #2578

Closes scylladb/scylladb#27469

* github.com:scylladb/scylladb:
  cache, mvcc: Preempt cache update when applying range tombstone from memtable
  partition_snapshot_row_cursor: Clarify non-obvious semantic difference of range_tombstone()
  perf-row-cache-update: Add scenario with large tombstone covering many rows
2025-12-07 11:54:15 +02:00
Avi Kivity
d811eeb4ca Merge 'Make direct failure detector verb handler more efficient' from Gleb Natapov
We saw that in large clusters direct failure detector may cause large task queues to be accumulated. The series address this issue and also moves the code into the correct scheduling group.

Fixes https://github.com/scylladb/scylladb/issues/27142

Backport to all version where 60f1053087 was backported to since it should improve performance in large clusters.

Closes scylladb/scylladb#27387

* github.com:scylladb/scylladb:
  direct_failure_detector: run direct failure detector in the gossiper scheduling group
  raft: drop invoke_on from the pinger verb handler
  direct_failure_detector: pass timeout to direct_fd_ping verb
2025-12-07 11:40:26 +02:00
Marcin Maliszkiewicz
4784e39665 auth: fix ctor signature of certificate_authenticator
In b9199e8b24 we
added cache argument to constructor of authenticators
but certificate_authenticator was ommited. Class
registrator sadly only fails in runtime for such
cases.

Fixes https://github.com/scylladb/scylladb/issues/27431

Closes scylladb/scylladb#27434
2025-12-07 11:18:42 +02:00
Tomasz Grabiec
d4014b7970 Drop legacy schema support
We switched to using v3 schema tables (in system_schema keyspace) in
2017, in 9eb91bc30b.

So no system should have the old schema any more.

No need to run legacy_schema_migrator on boot.

Closes scylladb/scylladb#27420
2025-12-07 00:09:13 +02:00
Tomasz Grabiec
92b5e4d63d cache, mvcc: Preempt cache update when applying range tombstone from memtable
Range tombstones are represented as entry attributes, which applies to
the interval between entries. So if a range tombstone covers many
rows, to apply it we have to update all covered entries.  In some
workloads that could be many entries, even the whole cache.  Before
the patch, we did this update without preemption, which can cause
reactor stalls in such workloads.

This scenario is already covered by mvcc_tests,
e.g. test_apply_to_incomplete_respects_continuity. And I verified that
the new preemption point is hit in the test.

perf-row-cache-update results show no significant stalls anymore (max
2ms scheduling delay, instead of previous 1.5 s):

Generated 1124195 rows
Memtable fill took 4179.457520 [ms], {count: 8295, 99%: 0.654949 [ms], max: 32.817176 [ms]}
Draining...
took 0.000616 [ms]
cache: 2506/2948 [MB], memtable: 781/1024 [MB], alloc/comp: 1051/662 [MB] (amp: 0.630)
update: 2874.157471 [ms], preemption: {count: 26650, 99%: 1.131752 [ms], max: 2.068762 [ms]}, cache: 3027/3973 [MB], alloc/comp: 3951/2424 [MB] (amp: 0.614), pr/me/dr 1124195/0/0

Fixes #23479
Fixes #2578
2025-12-06 13:45:35 +01:00
Tomasz Grabiec
e546143fd9 partition_snapshot_row_cursor: Clarify non-obvious semantic difference of range_tombstone() 2025-12-06 01:03:10 +01:00
Tomasz Grabiec
721434054b perf-row-cache-update: Add scenario with large tombstone covering many rows
Fills memtable with rows and a tombstone which deletes all rows which
are already in cache.

Similar to raft log workload, but more extreme.

With -c1 -m4G, observed really bad performance:

update: 1711.976196 [ms], preemption: {count: 22603, 99%: 0.943127 [ms], max: 1494.571776 [ms]}, cache: 2148/2906 [MB], alloc/comp: 1334/869 [MB] (amp: 0.651), pr/me/dr 1062186/0/1062187
cache: 2148/2906 [MB], memtable: 738/1024 [MB], alloc/comp: 993/0 [MB] (amp: 0.000)

Which means that max reactor stall during cache update was 1.5 [s]
0.7 GB memtables. 2.1 GB in cache.
2025-12-06 01:03:09 +01:00
Benny Halevy
19b6207f17 database: truncate_table_on_all_shards: set use_sstable_identifier to true
To facilitate global sstable deduplication on backup.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:57:39 +02:00
Benny Halevy
ff52550739 nodetool: snapshot: add --use-sstable-identifier option
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:57:39 +02:00
Benny Halevy
e654045755 api: storage_service: take_snapshot: add use_sstable_identifier option
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:57:39 +02:00
Benny Halevy
07b92a1ee8 test: database_test: add snapshot_use_sstable_identifier_works
Test that taking a snapshot with the use_sstable_identifier
option (and injecting `random_sstable_identifier`) produces
different file names in the snapshot than the original
sstable names and validate te manifest.json file respectively.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:57:38 +02:00
Benny Halevy
7504d10d9e test: database_test: snapshot_works: add validate_manifest
Validate the manifest.json format by loading it using rjson::parse
and then validate its contents to ensure it lists exactly the
SSTables present in the snapshot directory.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:55:50 +02:00
Benny Halevy
28cb300d0a sstable: write_scylla_metadata: add random_sstable_identifier error injection
To be used by a unit test in the following patch for testing
the snapshot use_sstable_identifier option.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:55:50 +02:00
Benny Halevy
9b3fbedc8c table: snapshot_on_all_shards: take snapshot_options
And pass the use_sstable_identifier down the stack
to the sstables layer.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:55:50 +02:00
Benny Halevy
420fb1fd53 sstable: add get_format getter
To be used by the snapshot code in te following patch
for manufacturing a basename using the sstable_id rather
than its generation.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:55:50 +02:00
Benny Halevy
7c62417b54 sstable: snapshot: add use_sstable_identifier option
When set to true, use the sstable_identifier as the sstable name
in the snapshot rather than its generation.

sstable::snapshot now returns the generation it used
for the sstable in the snapshot, based on the `use_sstable_identifier`
option, to be used by the upper layer generating the manifest.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:53:32 +02:00
Gleb Natapov
86dde50c0d direct_failure_detector: run direct failure detector in the gossiper scheduling group
When direct failure detector was introduces the idea was that it will
run on the same connection raft group0 verbs are running, but in
60f1053087 raft verbs were moved to run on the gossiper connection
while DIRECT_FD_PING was left where it was. This patch move it to
gossiper connection as well and fix the pinger code to run in gossiper
scheduling group.
2025-12-04 11:35:43 +02:00
Gleb Natapov
6a6bbbf1a6 raft: drop invoke_on from the pinger verb handler
Currently raft direct pinger verb jumps to shard 0 to check if group0 is
alive before replying. The verb runs relatively often, so it is not very
efficient. The patch distributes group0 liveness information (as it
changes) to all shard instead, so that the handler itself does not need
to jump to shard 0.
2025-12-04 11:35:43 +02:00
Benny Halevy
1c45ad7cee db: snapshot_ctl: snapshot_options: add use_sstable_identifier options
To be used for naming sstables in the snapshot by their
sstable identifiers rather than their generation, to
facilitate global deduplication of sstables in backup.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 09:46:35 +02:00
Benny Halevy
c18133b6cb db: snapshot_ctl: move skip_flush to struct snapshot_options
Prepare for adding another option: use_sstable_identifer.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 09:46:35 +02:00
Gleb Natapov
82f80478b8 direct_failure_detector: pass timeout to direct_fd_ping verb
Currently direct_fd_ping runs without timeout, but the verb is not
waited forever, the wait is canceled after a timeout, this timeout
simply is not passed to the rpc. It may create a situation where the
rpc callback can runs on a destination but it is no longer waited on.
Change the code to pass timeout to rpc as well and return earlier from
the rpc handler if the timeout is reached by the time the callback is
called. This is backwards compatible since timeout is passed as
optional.
2025-12-02 14:55:20 +02:00
70 changed files with 1180 additions and 1212 deletions

1
.github/CODEOWNERS vendored
View File

@@ -57,7 +57,6 @@ repair/* @tgrabiec @asias
# SCHEMA MANAGEMENT
db/schema_tables* @tgrabiec
db/legacy_schema_migrator* @tgrabiec
service/migration* @tgrabiec
schema* @tgrabiec

View File

@@ -2739,7 +2739,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
auto read_command = needs_read_before_write ?
previous_item_read_command(proxy, schema(), _ck, selection) :
nullptr;
return proxy.cas(schema(), std::move(*cas_shard), shared_from_this(), read_command, to_partition_ranges(*schema(), _pk),
return proxy.cas(schema(), std::move(*cas_shard), *this, read_command, to_partition_ranges(*schema(), _pk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM, timeout, timeout, true, std::move(cdc_opts)).then([this, read_command, &wcu_total] (bool is_applied) mutable {
if (!is_applied) {
@@ -3031,12 +3031,15 @@ struct primary_key_equal {
// done is known prior to starting the operation). Nevertheless, we want to
// do this mutation via LWT to ensure that it is serialized with other LWT
// mutations to the same partition.
//
// The std::vector<put_or_delete_item> must remain alive until the
// storage_proxy::cas() future is resolved.
class put_or_delete_item_cas_request : public service::cas_request {
schema_ptr schema;
std::vector<put_or_delete_item> _mutation_builders;
const std::vector<put_or_delete_item>& _mutation_builders;
public:
put_or_delete_item_cas_request(schema_ptr s, std::vector<put_or_delete_item>&& b) :
schema(std::move(s)), _mutation_builders(std::move(b)) { }
put_or_delete_item_cas_request(schema_ptr s, const std::vector<put_or_delete_item>& b) :
schema(std::move(s)), _mutation_builders(b) { }
virtual ~put_or_delete_item_cas_request() = default;
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override {
std::optional<mutation> ret;
@@ -3052,19 +3055,20 @@ public:
}
};
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, dht::decorated_key dk, std::vector<put_or_delete_item>&& mutation_builders,
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk, const std::vector<put_or_delete_item>& mutation_builders,
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
auto timeout = executor::default_timeout();
auto op = seastar::make_shared<put_or_delete_item_cas_request>(schema, std::move(mutation_builders));
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
auto* op_ptr = op.get();
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility =
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
return proxy.cas(schema, std::move(cas_shard), op, nullptr, to_partition_ranges(dk),
return proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
timeout, timeout, true, std::move(cdc_opts)).discard_result();
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItem
// does not need to support conditional updates.
}
@@ -3131,30 +3135,34 @@ static future<> do_batch_write(service::storage_proxy& proxy,
// Multiple mutations may be destined for the same partition, adding
// or deleting different items of one partition. Join them together
// because we can do them in one cas() call.
std::unordered_map<schema_decorated_key, std::vector<put_or_delete_item>, schema_decorated_key_hash, schema_decorated_key_equal>
key_builders(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
using map_type = std::unordered_map<schema_decorated_key,
std::vector<put_or_delete_item>,
schema_decorated_key_hash,
schema_decorated_key_equal>;
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto& b : mutation_builders) {
auto dk = dht::decorate_key(*b.first, b.second.pk());
auto [it, added] = key_builders.try_emplace(schema_decorated_key{b.first, dk});
auto [it, added] = key_builders->try_emplace(schema_decorated_key{b.first, dk});
it->second.push_back(std::move(b.second));
}
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (auto& e) {
auto* key_builders_ptr = key_builders.get();
return parallel_for_each(*key_builders_ptr, [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (const auto& e) {
stats.write_using_lwt++;
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
if (desired_shard.this_shard()) {
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, std::move(e.second), client_state, trace_state, permit);
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, e.second, client_state, trace_state, permit);
} else {
stats.shard_bounce_for_lwt++;
return proxy.container().invoke_on(desired_shard.shard(), ssg,
[cs = client_state.move_to_other_shard(),
mb = e.second,
dk = e.first.dk,
&mb = e.second,
&dk = e.first.dk,
ks = e.first.schema->ks_name(),
cf = e.first.schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(service::storage_proxy& proxy) mutable {
return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf),
return do_with(cs.get(), [&proxy, &mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt)]
(service::client_state& client_state) mutable {
auto schema = proxy.data_dictionary().find_schema(ks, cf);
@@ -3168,11 +3176,11 @@ static future<> do_batch_write(service::storage_proxy& proxy,
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return cas_write(proxy, schema, std::move(cas_shard), dk, std::move(mb), client_state, std::move(trace_state), empty_service_permit());
return cas_write(proxy, schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
}).finally([desired_shard = std::move(desired_shard)]{});
}
});
}).finally([key_builders = std::move(key_builders)]{});
}
}

View File

@@ -729,6 +729,14 @@
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
},
{
"name":"use_sstable_identifier",
"description":"Use the sstable identifier UUID, if available, rather than the sstable generation.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
},

View File

@@ -2020,12 +2020,16 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("cf"), ",");
auto sfopt = req->get_query_param("sf");
auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
auto usiopt = req->get_query_param("use_sstable_identifier");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
.use_sstable_identifier = strcasecmp(usiopt.c_str(), "true") == 0
};
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
try {
if (column_families.empty()) {
co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
co_await snap_ctl.local().take_snapshot(tag, keynames, opts);
} else {
if (keynames.empty()) {
throw httpd::bad_param_exception("The keyspace of column families must be specified");
@@ -2033,7 +2037,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
}
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts);
}
co_return json_void();
} catch (...) {
@@ -2068,7 +2072,8 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
}
compaction::compaction_stats stats;

View File

@@ -146,7 +146,8 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();

View File

@@ -8,6 +8,7 @@
*/
#include "auth/certificate_authenticator.hh"
#include "auth/cache.hh"
#include <boost/regex.hpp>
#include <fmt/ranges.h>
@@ -34,13 +35,14 @@ static const class_registrator<auth::authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, auth::cache&
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
: _queries([&] {
auto& conf = qp.db().get_config();
auto queries = conf.auth_certificate_role_queries();

View File

@@ -26,13 +26,15 @@ class raft_group0_client;
namespace auth {
class cache;
extern const std::string_view certificate_authenticator_name;
class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;
public:
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
~certificate_authenticator();
future<> start() override;

View File

@@ -1062,7 +1062,6 @@ scylla_core = (['message/messaging_service.cc',
'db/hints/resource_manager.cc',
'db/hints/sync_point.cc',
'db/large_data_handler.cc',
'db/legacy_schema_migrator.cc',
'db/marshal/type_parser.cc',
'db/per_partition_rate_limit_options.cc',
'db/rate_limiter.cc',

View File

@@ -331,7 +331,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
if (!cl_for_paxos) [[unlikely]] {
return make_exception_future<shared_ptr<cql_transport::messages::result_message>>(std::move(cl_for_paxos).assume_error());
}
seastar::shared_ptr<cas_request> request;
std::unique_ptr<cas_request> request;
schema_ptr schema;
db::timeout_clock::time_point now = db::timeout_clock::now();
@@ -354,9 +354,9 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
if (keys.empty()) {
continue;
}
if (request.get() == nullptr) {
if (!request) {
schema = statement.s;
request = seastar::make_shared<cas_request>(schema, std::move(keys));
request = std::make_unique<cas_request>(schema, std::move(keys));
} else if (keys.size() != 1 || keys.front().equal(request->key().front(), dht::ring_position_comparator(*schema)) == false) {
throw exceptions::invalid_request_exception("BATCH with conditions cannot span multiple partitions");
}
@@ -366,7 +366,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
request->add_row_update(statement, std::move(ranges), std::move(json_cache), statement_options);
}
if (request.get() == nullptr) {
if (!request) {
throw exceptions::invalid_request_exception(format("Unrestricted partition key in a conditional BATCH"));
}
@@ -377,9 +377,10 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
);
}
return qp.proxy().cas(schema, std::move(cas_shard), request, request->read_command(qp), request->key(),
auto* request_ptr = request.get();
return qp.proxy().cas(schema, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request] (bool is_applied) {
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request = std::move(request)] (bool is_applied) {
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
});
}

View File

@@ -401,7 +401,8 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
type.is_update() ? "update" : "deletion"));
}
auto request = seastar::make_shared<cas_request>(s, std::move(keys));
auto request = std::make_unique<cas_request>(s, std::move(keys));
auto* request_ptr = request.get();
// cas_request can be used for batches as well single statements; Here we have just a single
// modification in the list of CAS commands, since we're handling single-statement execution.
request->add_row_update(*this, std::move(ranges), std::move(json_cache), options);
@@ -427,9 +428,9 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
tablet_info = erm->check_locality(token);
}
return qp.proxy().cas(s, std::move(cas_shard), request, request->read_command(qp), request->key(),
return qp.proxy().cas(s, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request, tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request = std::move(request), tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
result->add_tablet_info(tablet_replicas, token_range);
return result;

View File

@@ -10,7 +10,6 @@ target_sources(db
schema_applier.cc
schema_tables.cc
cql_type_parser.cc
legacy_schema_migrator.cc
commitlog/commitlog.cc
commitlog/commitlog_replayer.cc
commitlog/commitlog_entry.cc

View File

@@ -1,602 +0,0 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
// Since Scylla 2.0, we use system tables whose schemas were introduced in
// Cassandra 3. If Scylla boots to find a data directory with system tables
// with older schemas - produced by pre-2.0 Scylla or by pre-3.0 Cassandra,
// we need to migrate these old tables to the new format.
//
// We provide here a function, db::legacy_schema_migrator::migrate(),
// for a one-time migration from old to new system tables. The function
// reads old system tables, write them back in the new format, and finally
// delete the old system tables. Scylla's main should call this function and
// wait for the returned future, before starting to serve the database.
#include <boost/iterator/filter_iterator.hpp>
#include <seastar/core/future-util.hh>
#include <seastar/util/log.hh>
#include <map>
#include <unordered_set>
#include <chrono>
#include "replica/database.hh"
#include "legacy_schema_migrator.hh"
#include "system_keyspace.hh"
#include "schema_tables.hh"
#include "schema/schema_builder.hh"
#include "service/storage_proxy.hh"
#include "utils/rjson.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/util.hh"
#include "cql3/statements/property_definitions.hh"
static seastar::logger mlogger("legacy_schema_migrator");
namespace db {
namespace legacy_schema_migrator {
// local data carriers
class migrator {
public:
static const std::unordered_set<sstring> legacy_schema_tables;
migrator(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp)
: _sp(sp), _db(db), _sys_ks(sys_ks), _qp(qp) {
}
migrator(migrator&&) = default;
typedef db_clock::time_point time_point;
// TODO: we don't support triggers.
// this is a placeholder.
struct trigger {
time_point timestamp;
sstring name;
std::unordered_map<sstring, sstring> options;
};
struct table {
time_point timestamp;
schema_ptr metadata;
std::vector<trigger> triggers;
};
struct type {
time_point timestamp;
user_type metadata;
};
struct function {
time_point timestamp;
sstring ks_name;
sstring fn_name;
std::vector<sstring> arg_names;
std::vector<sstring> arg_types;
sstring return_type;
bool called_on_null_input;
sstring language;
sstring body;
};
struct aggregate {
time_point timestamp;
sstring ks_name;
sstring fn_name;
std::vector<sstring> arg_names;
std::vector<sstring> arg_types;
sstring return_type;
sstring final_func;
sstring initcond;
sstring state_func;
sstring state_type;
};
struct keyspace {
time_point timestamp;
sstring name;
bool durable_writes;
std::map<sstring, sstring> replication_params;
std::vector<table> tables;
std::vector<type> types;
std::vector<function> functions;
std::vector<aggregate> aggregates;
};
class unsupported_feature : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
static sstring fmt_query(const char* fmt, const char* table) {
return fmt::format(fmt::runtime(fmt), db::system_keyspace::NAME, table);
}
typedef ::shared_ptr<cql3::untyped_result_set> result_set_type;
typedef const cql3::untyped_result_set::row row_type;
future<> read_table(keyspace& dst, sstring cf_name, time_point timestamp) {
auto fmt = "SELECT * FROM {}.{} WHERE keyspace_name = ? AND columnfamily_name = ?";
auto tq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNFAMILIES);
auto cq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNS);
auto zq = fmt_query(fmt, db::system_keyspace::legacy::TRIGGERS);
typedef std::tuple<future<result_set_type>, future<result_set_type>, future<result_set_type>, future<db::schema_tables::legacy::schema_mutations>> result_tuple;
return when_all(_qp.execute_internal(tq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
_qp.execute_internal(cq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
_qp.execute_internal(zq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
db::schema_tables::legacy::read_table_mutations(_sp, dst.name, cf_name, db::system_keyspace::legacy::column_families()))
.then([&dst, cf_name, timestamp](result_tuple&& t) {
result_set_type tables = std::get<0>(t).get();
result_set_type columns = std::get<1>(t).get();
result_set_type triggers = std::get<2>(t).get();
db::schema_tables::legacy::schema_mutations sm = std::get<3>(t).get();
row_type& td = tables->one();
auto ks_name = td.get_as<sstring>("keyspace_name");
auto cf_name = td.get_as<sstring>("columnfamily_name");
auto id = table_id(td.get_or("cf_id", generate_legacy_id(ks_name, cf_name).uuid()));
schema_builder builder(dst.name, cf_name, id);
builder.with_version(sm.digest());
cf_type cf = sstring_to_cf_type(td.get_or("type", sstring("standard")));
if (cf == cf_type::super) {
fail(unimplemented::cause::SUPER);
}
auto comparator = td.get_as<sstring>("comparator");
bool is_compound = cell_comparator::check_compound(comparator);
builder.set_is_compound(is_compound);
cell_comparator::read_collections(builder, comparator);
bool filter_sparse = false;
data_type default_validator = {};
if (td.has("default_validator")) {
default_validator = db::schema_tables::parse_type(td.get_as<sstring>("default_validator"));
if (default_validator->is_counter()) {
builder.set_is_counter(true);
}
builder.set_default_validation_class(default_validator);
}
/*
* Determine whether or not the table is *really* dense
* We cannot trust is_dense value of true (see CASSANDRA-11502, that fixed the issue for 2.2 only, and not retroactively),
* but we can trust is_dense value of false.
*/
auto is_dense = td.get_opt<bool>("is_dense");
if (!is_dense || *is_dense) {
is_dense = [&] {
/*
* As said above, this method is only here because we need to deal with thrift upgrades.
* Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
* then we'll have saved the "is_dense" value and will be good to go.
*
* But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need
* to infer that information without relying on it in that case. And for the most part this is
* easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
* having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
* PRIMARY KEY defined.
*
* So we need to recognize those special case CQL3 table with only a primary key. If we have some
* clustering columns, we're fine as said above. So the only problem is that we cannot decide for
* sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
* has been created in CQL3 by say:
* CREATE TABLE test (k int PRIMARY KEY)
* in which case it should not be dense. However, we can limit our margin of error by assuming we are
* in the latter case only if the comparator is exactly CompositeType(UTF8Type).
*/
std::optional<column_id> max_cl_idx;
const cql3::untyped_result_set::row * regular = nullptr;
for (auto& row : *columns) {
auto kind_str = row.get_as<sstring>("type");
if (kind_str == "compact_value") {
continue;
}
auto kind = db::schema_tables::deserialize_kind(kind_str);
if (kind == column_kind::regular_column) {
if (regular != nullptr) {
return false;
}
regular = &row;
continue;
}
if (kind == column_kind::clustering_key) {
max_cl_idx = std::max(column_id(row.get_or("component_index", 0)), max_cl_idx.value_or(column_id()));
}
}
auto is_cql3_only_pk_comparator = [](const sstring& comparator) {
if (!cell_comparator::check_compound(comparator)) {
return false;
}
// CMH. We don't have composites, nor a parser for it. This is a simple way of c
// checking the same.
auto comma = comparator.find(',');
if (comma != sstring::npos) {
return false;
}
auto off = comparator.find('(');
auto end = comparator.find(')');
return comparator.compare(off, end - off, utf8_type->name()) == 0;
};
if (max_cl_idx) {
auto n = std::count(comparator.begin(), comparator.end(), ','); // num comp - 1
return *max_cl_idx == n;
}
if (regular) {
return false;
}
return !is_cql3_only_pk_comparator(comparator);
}();
// now, if switched to sparse, remove redundant compact_value column and the last clustering column,
// directly copying CASSANDRA-11502 logic. See CASSANDRA-11315.
filter_sparse = !*is_dense;
}
builder.set_is_dense(*is_dense);
auto is_cql = !*is_dense && is_compound;
auto is_static_compact = !*is_dense && !is_compound;
// org.apache.cassandra.schema.LegacySchemaMigrator#isEmptyCompactValueColumn
auto is_empty_compact_value = [](const cql3::untyped_result_set::row& column_row) {
auto kind_str = column_row.get_as<sstring>("type");
// Cassandra only checks for "compact_value", but Scylla generates "regular" instead (#2586)
return (kind_str == "compact_value" || kind_str == "regular")
&& column_row.get_as<sstring>("column_name").empty();
};
for (auto& row : *columns) {
auto kind_str = row.get_as<sstring>("type");
auto kind = db::schema_tables::deserialize_kind(kind_str);
auto component_index = kind > column_kind::clustering_key ? 0 : column_id(row.get_or("component_index", 0));
auto name = row.get_or<sstring>("column_name", sstring());
auto validator = db::schema_tables::parse_type(row.get_as<sstring>("validator"));
if (is_empty_compact_value(row)) {
continue;
}
if (filter_sparse) {
if (kind_str == "compact_value") {
continue;
}
if (kind == column_kind::clustering_key) {
if (cf == cf_type::super && component_index != 0) {
continue;
}
if (cf != cf_type::super && !is_compound) {
continue;
}
}
}
std::optional<index_metadata_kind> index_kind;
sstring index_name;
index_options_map options;
if (row.has("index_type")) {
index_kind = schema_tables::deserialize_index_kind(row.get_as<sstring>("index_type"));
}
if (row.has("index_name")) {
index_name = row.get_as<sstring>("index_name");
}
if (row.has("index_options")) {
sstring index_options_str = row.get_as<sstring>("index_options");
options = rjson::parse_to_map<index_options_map>(std::string_view(index_options_str));
sstring type;
auto i = options.find("index_keys");
if (i != options.end()) {
options.erase(i);
type = "KEYS";
}
i = options.find("index_keys_and_values");
if (i != options.end()) {
options.erase(i);
type = "KEYS_AND_VALUES";
}
if (type.empty()) {
if (validator->is_collection() && validator->is_multi_cell()) {
type = "FULL";
} else {
type = "VALUES";
}
}
auto column = cql3::util::maybe_quote(name);
options["target"] = validator->is_collection()
? type + "(" + column + ")"
: column;
}
if (index_kind) {
// Origin assumes index_name is always set, so let's do the same
builder.with_index(index_metadata(index_name, options, *index_kind, index_metadata::is_local_index::no));
}
data_type column_name_type = [&] {
if (is_static_compact && kind == column_kind::regular_column) {
return db::schema_tables::parse_type(comparator);
}
return utf8_type;
}();
auto column_name = [&] {
try {
return column_name_type->from_string(name);
} catch (marshal_exception&) {
// #2597: Scylla < 2.0 writes names in serialized form, try to recover
column_name_type->validate(to_bytes_view(name));
return to_bytes(name);
}
}();
builder.with_column_ordered(column_definition(std::move(column_name), std::move(validator), kind, component_index));
}
if (is_static_compact) {
builder.set_regular_column_name_type(db::schema_tables::parse_type(comparator));
}
if (td.has("gc_grace_seconds")) {
builder.set_gc_grace_seconds(td.get_as<int32_t>("gc_grace_seconds"));
}
if (td.has("min_compaction_threshold")) {
builder.set_min_compaction_threshold(td.get_as<int32_t>("min_compaction_threshold"));
}
if (td.has("max_compaction_threshold")) {
builder.set_max_compaction_threshold(td.get_as<int32_t>("max_compaction_threshold"));
}
if (td.has("comment")) {
builder.set_comment(td.get_as<sstring>("comment"));
}
if (td.has("memtable_flush_period_in_ms")) {
builder.set_memtable_flush_period(td.get_as<int32_t>("memtable_flush_period_in_ms"));
}
if (td.has("caching")) {
builder.set_caching_options(caching_options::from_sstring(td.get_as<sstring>("caching")));
}
if (td.has("default_time_to_live")) {
builder.set_default_time_to_live(gc_clock::duration(td.get_as<int32_t>("default_time_to_live")));
}
if (td.has("speculative_retry")) {
builder.set_speculative_retry(td.get_as<sstring>("speculative_retry"));
}
if (td.has("compaction_strategy_class")) {
auto strategy = td.get_as<sstring>("compaction_strategy_class");
try {
builder.set_compaction_strategy(compaction::compaction_strategy::type(strategy));
} catch (const exceptions::configuration_exception& e) {
// If compaction strategy class isn't supported, fallback to incremental.
mlogger.warn("Falling back to incremental compaction strategy after the problem: {}", e.what());
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
}
}
if (td.has("compaction_strategy_options")) {
sstring strategy_options_str = td.get_as<sstring>("compaction_strategy_options");
builder.set_compaction_strategy_options(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options_str)));
}
auto comp_param = td.get_as<sstring>("compression_parameters");
compression_parameters cp(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(comp_param)));
builder.set_compressor_params(cp);
if (td.has("min_index_interval")) {
builder.set_min_index_interval(td.get_as<int32_t>("min_index_interval"));
} else if (td.has("index_interval")) { // compatibility
builder.set_min_index_interval(td.get_as<int32_t>("index_interval"));
}
if (td.has("max_index_interval")) {
builder.set_max_index_interval(td.get_as<int32_t>("max_index_interval"));
}
if (td.has("bloom_filter_fp_chance")) {
builder.set_bloom_filter_fp_chance(td.get_as<double>("bloom_filter_fp_chance"));
} else {
builder.set_bloom_filter_fp_chance(builder.get_bloom_filter_fp_chance());
}
if (td.has("dropped_columns")) {
auto map = td.get_map<sstring, int64_t>("dropped_columns");
for (auto&& e : map) {
builder.without_column(e.first, api::timestamp_type(e.second));
};
}
// ignore version. we're transient
if (!triggers->empty()) {
throw unsupported_feature("triggers");
}
dst.tables.emplace_back(table{timestamp, builder.build() });
});
}
future<> read_tables(keyspace& dst) {
auto query = fmt_query("SELECT columnfamily_name, writeTime(type) AS timestamp FROM {}.{} WHERE keyspace_name = ?",
db::system_keyspace::legacy::COLUMNFAMILIES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
return parallel_for_each(*result, [this, &dst](row_type& row) {
return read_table(dst, row.get_as<sstring>("columnfamily_name"), row.get_as<time_point>("timestamp"));
}).finally([result] {});
});
}
future<time_point> read_type_timestamp(keyspace& dst, sstring type_name) {
// TODO: Unfortunately there is not a single REGULAR column in system.schema_usertypes, so annoyingly we cannot
// use the writeTime() CQL function, and must resort to a lower level.
// Origin digs up the actual cells of target partition and gets timestamp from there.
// We should do the same, but g-dam that's messy. Lets give back dung value for now.
return make_ready_future<time_point>(dst.timestamp);
}
future<> read_types(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::USERTYPES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
return parallel_for_each(*result, [this, &dst](row_type& row) {
auto name = row.get_blob_unfragmented("type_name");
auto columns = row.get_list<bytes>("field_names");
auto types = row.get_list<sstring>("field_types");
std::vector<data_type> field_types;
for (auto&& value : types) {
field_types.emplace_back(db::schema_tables::parse_type(value));
}
auto ut = user_type_impl::get_instance(dst.name, name, columns, field_types, false);
return read_type_timestamp(dst, value_cast<sstring>(utf8_type->deserialize(name))).then([ut = std::move(ut), &dst](time_point timestamp) {
dst.types.emplace_back(type{timestamp, ut});
});
}).finally([result] {});
});
}
future<> read_functions(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::FUNCTIONS);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
if (!result->empty()) {
throw unsupported_feature("functions");
}
});
}
future<> read_aggregates(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::AGGREGATES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
if (!result->empty()) {
throw unsupported_feature("aggregates");
}
});
}
future<keyspace> read_keyspace(sstring ks_name, bool durable_writes, sstring strategy_class, sstring strategy_options, time_point timestamp) {
auto map = rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options));
map.emplace("class", std::move(strategy_class));
auto ks = ::make_lw_shared<keyspace>(keyspace{timestamp, std::move(ks_name), durable_writes, std::move(map) });
return read_tables(*ks).then([this, ks] {
//Collection<Type> types = readTypes(keyspaceName);
return read_types(*ks);
}).then([this, ks] {
return read_functions(*ks);
}).then([this, ks] {
return read_aggregates(*ks);
}).then([ks] {
return make_ready_future<keyspace>(std::move(*ks));
});
}
future<> read_all_keyspaces() {
static auto ks_filter = [](row_type& row) {
auto ks_name = row.get_as<sstring>("keyspace_name");
return ks_name != db::system_keyspace::NAME && ks_name != db::schema_tables::v3::NAME;
};
auto query = fmt_query("SELECT keyspace_name, durable_writes, strategy_options, strategy_class, writeTime(durable_writes) AS timestamp FROM {}.{}",
db::system_keyspace::legacy::KEYSPACES);
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([this](result_set_type result) {
auto i = boost::make_filter_iterator(ks_filter, result->begin(), result->end());
auto e = boost::make_filter_iterator(ks_filter, result->end(), result->end());
return parallel_for_each(i, e, [this](row_type& row) {
return read_keyspace(row.get_as<sstring>("keyspace_name")
, row.get_as<bool>("durable_writes")
, row.get_as<sstring>("strategy_class")
, row.get_as<sstring>("strategy_options")
, row.get_as<db_clock::time_point>("timestamp")
).then([this](keyspace ks) {
_keyspaces.emplace_back(std::move(ks));
});
}).finally([result] {});
});
}
future<> drop_legacy_tables() {
mlogger.info("Dropping legacy schema tables");
auto with_snapshot = !_keyspaces.empty();
for (const sstring& cfname : legacy_schema_tables) {
co_await replica::database::legacy_drop_table_on_all_shards(_db, _sys_ks, db::system_keyspace::NAME, cfname, with_snapshot);
}
}
future<> store_keyspaces_in_new_schema_tables() {
mlogger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
_keyspaces.size(), db::schema_tables::v3::NAME);
utils::chunked_vector<mutation> mutations;
for (auto& ks : _keyspaces) {
auto ksm = ::make_lw_shared<keyspace_metadata>(ks.name
, ks.replication_params["class"] // TODO, make ksm like c3?
, cql3::statements::property_definitions::to_extended_map(ks.replication_params)
, std::nullopt
, std::nullopt
, ks.durable_writes);
// we want separate time stamps for tables/types, so cannot bulk them into the ksm.
for (auto&& m : db::schema_tables::make_create_keyspace_mutations(schema_features::full(), ksm, ks.timestamp.time_since_epoch().count(), false)) {
mutations.emplace_back(std::move(m));
}
for (auto& t : ks.tables) {
db::schema_tables::add_table_or_view_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), true, mutations);
}
for (auto& t : ks.types) {
db::schema_tables::add_type_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), mutations);
}
}
return _qp.proxy().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
}
future<> flush_schemas() {
auto& db = _qp.db().real_database().container();
return replica::database::flush_tables_on_all_shards(db, db::schema_tables::all_table_infos(schema_features::full()));
}
future<> migrate() {
return read_all_keyspaces().then([this]() {
// write metadata to the new schema tables
return store_keyspaces_in_new_schema_tables()
.then(std::bind(&migrator::flush_schemas, this))
.then(std::bind(&migrator::drop_legacy_tables, this))
.then([] { mlogger.info("Completed migration of legacy schema tables"); });
});
}
sharded<service::storage_proxy>& _sp;
sharded<replica::database>& _db;
sharded<db::system_keyspace>& _sys_ks;
cql3::query_processor& _qp;
std::vector<keyspace> _keyspaces;
};
const std::unordered_set<sstring> migrator::legacy_schema_tables = {
db::system_keyspace::legacy::KEYSPACES,
db::system_keyspace::legacy::COLUMNFAMILIES,
db::system_keyspace::legacy::COLUMNS,
db::system_keyspace::legacy::TRIGGERS,
db::system_keyspace::legacy::USERTYPES,
db::system_keyspace::legacy::FUNCTIONS,
db::system_keyspace::legacy::AGGREGATES,
};
}
}
future<>
db::legacy_schema_migrator::migrate(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp) {
return do_with(migrator(sp, db, sys_ks, qp), std::bind(&migrator::migrate, std::placeholders::_1));
}

View File

@@ -1,37 +0,0 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include "seastarx.hh"
namespace replica {
class database;
}
namespace cql3 {
class query_processor;
}
namespace service {
class storage_proxy;
}
namespace db {
class system_keyspace;
namespace legacy_schema_migrator {
future<> migrate(sharded<service::storage_proxy>&, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor&);
}
}

View File

@@ -542,6 +542,7 @@ public:
// Returns the range tombstone for the key range adjacent to the cursor's position from the side of smaller keys.
// Excludes the range for the row itself. That information is returned by range_tombstone_for_row().
// It's possible that range_tombstone() is empty and range_tombstone_for_row() is not empty.
// Note that this is different from the meaning of rows_entry::range_tombstone(), which includes the row itself.
tombstone range_tombstone() const { return _range_tombstone; }
// Can be called when cursor is pointing at a row.

View File

@@ -1287,6 +1287,15 @@ row_cache::row_cache(schema_ptr s, snapshot_source src, cache_tracker& tracker,
, _partitions(dht::raw_token_less_comparator{})
, _underlying(src())
, _snapshot_source(std::move(src))
, _update_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.update {}.{}", _schema->ks_name(), _schema->cf_name());
}))
, _populate_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.populate {}.{}", _schema->ks_name(), _schema->cf_name());
}))
, _read_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.read {}.{}", _schema->ks_name(), _schema->cf_name());
}))
{
try {
with_allocator(_tracker.allocator(), [this, cont] {

View File

@@ -404,10 +404,7 @@ const std::unordered_set<table_id>& schema_tables_holding_schema_mutations() {
computed_columns(),
dropped_columns(),
indexes(),
scylla_tables(),
db::system_keyspace::legacy::column_families(),
db::system_keyspace::legacy::columns(),
db::system_keyspace::legacy::triggers()}) {
scylla_tables()}) {
SCYLLA_ASSERT(s->clustering_key_size() > 0);
auto&& first_column_name = s->clustering_column_at(0).name_as_text();
SCYLLA_ASSERT(first_column_name == "table_name"
@@ -2840,26 +2837,6 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
}
namespace legacy {
table_schema_version schema_mutations::digest() const {
md5_hasher h;
const db::schema_features no_features;
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, no_features);
db::schema_tables::feed_hash_for_schema_digest(h, _columns, no_features);
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
}
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s)
{
mutation cf_m = co_await read_schema_partition_for_table(proxy, s, keyspace_name, table_name);
mutation col_m = co_await read_schema_partition_for_table(proxy, db::system_keyspace::legacy::columns(), keyspace_name, table_name);
co_return schema_mutations{std::move(cf_m), std::move(col_m)};
}
} // namespace legacy
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);

View File

@@ -155,24 +155,6 @@ schema_ptr scylla_table_schema_history();
const std::unordered_set<table_id>& schema_tables_holding_schema_mutations();
}
namespace legacy {
class schema_mutations {
mutation _columnfamilies;
mutation _columns;
public:
schema_mutations(mutation columnfamilies, mutation columns)
: _columnfamilies(std::move(columnfamilies))
, _columns(std::move(columns))
{ }
table_schema_version digest() const;
};
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s);
}
struct qualified_name {
sstring keyspace_name;
sstring table_name;

View File

@@ -65,7 +65,7 @@ future<> snapshot_ctl::run_snapshot_modify_operation(noncopyable_function<future
});
}
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
if (tag.empty()) {
throw std::runtime_error("You must supply a snapshot name.");
}
@@ -74,21 +74,21 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(keyspace_names));
};
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] () mutable {
return do_take_snapshot(std::move(tag), std::move(keyspace_names), sf);
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), opts, this] () mutable {
return do_take_snapshot(std::move(tag), std::move(keyspace_names), opts);
});
}
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
co_await coroutine::parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) {
return check_snapshot_not_exist(ks_name, tag);
});
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, bool(sf));
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), opts] (const auto& ks_name) {
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, opts);
});
}
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
if (ks_name.empty()) {
throw std::runtime_error("You must supply a keyspace name");
}
@@ -99,14 +99,14 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
throw std::runtime_error("You must supply a snapshot name.");
}
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] () mutable {
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), sf);
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), opts);
});
}
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
co_await check_snapshot_not_exist(ks_name, tag, tables);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), bool(sf));
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
}
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {

View File

@@ -38,10 +38,13 @@ class backup_task_impl;
} // snapshot namespace
struct snapshot_options {
bool skip_flush = false;
bool use_sstable_identifier = false;
};
class snapshot_ctl : public peering_sharded_service<snapshot_ctl> {
public:
using skip_flush = bool_class<class skip_flush_tag>;
struct table_snapshot_details {
int64_t total;
int64_t live;
@@ -70,8 +73,8 @@ public:
*
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_snapshot(sstring tag, skip_flush sf = skip_flush::no) {
return take_snapshot(tag, {}, sf);
future<> take_snapshot(sstring tag, snapshot_options opts = {}) {
return take_snapshot(tag, {}, opts);
}
/**
@@ -80,7 +83,7 @@ public:
* @param tag the tag given to the snapshot; may not be null or empty
* @param keyspace_names the names of the keyspaces to snapshot; empty means "all"
*/
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {});
/**
* Takes the snapshot of multiple tables. A snapshot name must be specified.
@@ -89,7 +92,7 @@ public:
* @param tables a vector of tables names to snapshot
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
/**
* Remove the snapshot with the given name from the given keyspaces.
@@ -127,8 +130,8 @@ private:
friend class snapshot::backup_task_impl;
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
};
}

View File

@@ -137,6 +137,8 @@ namespace {
system_keyspace::ROLE_PERMISSIONS,
system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS,
// repair tasks
system_keyspace::REPAIR_TASKS,
};
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
props.is_group0_table = true;
@@ -462,6 +464,24 @@ schema_ptr system_keyspace::repair_history() {
return schema;
}
schema_ptr system_keyspace::repair_tasks() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, REPAIR_TASKS);
return schema_builder(NAME, REPAIR_TASKS, std::optional(id))
.with_column("task_uuid", uuid_type, column_kind::partition_key)
.with_column("operation", utf8_type, column_kind::clustering_key)
// First and last token for of the tablet
.with_column("first_token", long_type, column_kind::clustering_key)
.with_column("last_token", long_type, column_kind::clustering_key)
.with_column("timestamp", timestamp_type)
.with_column("table_uuid", uuid_type, column_kind::static_column)
.set_comment("Record tablet repair tasks")
.with_hash_version()
.build();
}();
return schema;
}
schema_ptr system_keyspace::built_indexes() {
static thread_local auto built_indexes = [] {
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
@@ -847,8 +867,6 @@ schema_ptr system_keyspace::corrupt_data() {
return corrupt_data;
}
static constexpr auto schema_gc_grace = std::chrono::duration_cast<std::chrono::seconds>(days(7)).count();
/*static*/ schema_ptr system_keyspace::scylla_local() {
static thread_local auto scylla_local = [] {
schema_builder builder(generate_legacy_id(NAME, SCYLLA_LOCAL), NAME, SCYLLA_LOCAL,
@@ -1360,289 +1378,6 @@ schema_ptr system_keyspace::role_permissions() {
return schema;
}
schema_ptr system_keyspace::legacy::hints() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS,
// partition key
{{"target_id", uuid_type}},
// clustering key
{{"hint_id", timeuuid_type}, {"message_version", int32_type}},
// regular columns
{{"mutation", bytes_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* hints awaiting delivery"
);
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
builder.set_compaction_strategy_options({{"enabled", "false"}});
builder.with(schema_builder::compact_storage::yes);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::batchlog() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, BATCHLOG), NAME, BATCHLOG,
// partition key
{{"id", uuid_type}},
// clustering key
{},
// regular columns
{{"data", bytes_type}, {"version", int32_type}, {"written_at", timestamp_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* batchlog entries"
);
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
builder.set_compaction_strategy_options({{"min_threshold", "2"}});
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::keyspaces() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, KEYSPACES), NAME, KEYSPACES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{},
// regular columns
{
{"durable_writes", boolean_type},
{"strategy_class", utf8_type},
{"strategy_options", utf8_type}
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* keyspace definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::yes);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::column_families() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, COLUMNFAMILIES), NAME, COLUMNFAMILIES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}},
// regular columns
{
{"bloom_filter_fp_chance", double_type},
{"caching", utf8_type},
{"cf_id", uuid_type},
{"comment", utf8_type},
{"compaction_strategy_class", utf8_type},
{"compaction_strategy_options", utf8_type},
{"comparator", utf8_type},
{"compression_parameters", utf8_type},
{"default_time_to_live", int32_type},
{"default_validator", utf8_type},
{"dropped_columns", map_type_impl::get_instance(utf8_type, long_type, true)},
{"gc_grace_seconds", int32_type},
{"is_dense", boolean_type},
{"key_validator", utf8_type},
{"max_compaction_threshold", int32_type},
{"max_index_interval", int32_type},
{"memtable_flush_period_in_ms", int32_type},
{"min_compaction_threshold", int32_type},
{"min_index_interval", int32_type},
{"speculative_retry", utf8_type},
{"subcomparator", utf8_type},
{"type", utf8_type},
// The following 4 columns are only present up until 2.1.8 tables
{"key_aliases", utf8_type},
{"value_alias", utf8_type},
{"column_aliases", utf8_type},
{"index_interval", int32_type},},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* table definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::columns() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, COLUMNS), NAME, COLUMNS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}, {"column_name", utf8_type}},
// regular columns
{
{"component_index", int32_type},
{"index_name", utf8_type},
{"index_options", utf8_type},
{"index_type", utf8_type},
{"type", utf8_type},
{"validator", utf8_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"column definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::triggers() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, TRIGGERS), NAME, TRIGGERS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}, {"trigger_name", utf8_type}},
// regular columns
{
{"trigger_options", map_type_impl::get_instance(utf8_type, utf8_type, true)},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"trigger definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::usertypes() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, USERTYPES), NAME, USERTYPES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"type_name", utf8_type}},
// regular columns
{
{"field_names", list_type_impl::get_instance(utf8_type, true)},
{"field_types", list_type_impl::get_instance(utf8_type, true)},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"user defined type definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::functions() {
/**
* Note: we have our own "legacy" version of this table (in schema_tables),
* but it is (afaik) not used, and differs slightly from the origin one.
* This is based on the origin schema, since we're more likely to encounter
* installations of that to migrate, rather than our own (if we dont use the table).
*/
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, FUNCTIONS), NAME, FUNCTIONS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"function_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
// regular columns
{
{"argument_names", list_type_impl::get_instance(utf8_type, true)},
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
{"body", utf8_type},
{"language", utf8_type},
{"return_type", utf8_type},
{"called_on_null_input", boolean_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* user defined type definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::aggregates() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, AGGREGATES), NAME, AGGREGATES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"aggregate_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
// regular columns
{
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
{"final_func", utf8_type},
{"initcond", bytes_type},
{"return_type", utf8_type},
{"state_func", utf8_type},
{"state_type", utf8_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* user defined aggregate definition"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::dicts() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, DICTS);
@@ -2596,6 +2331,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
corrupt_data(),
scylla_local(), db::schema_tables::scylla_table_schema_history(),
repair_history(),
repair_tasks(),
v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),
@@ -2615,13 +2351,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
if (cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
r.insert(r.end(), {sstables_registry()});
}
// legacy schema
r.insert(r.end(), {
// TODO: once we migrate hints/batchlog and add converter
// legacy::hints(), legacy::batchlog(),
legacy::keyspaces(), legacy::column_families(),
legacy::columns(), legacy::triggers(), legacy::usertypes(),
legacy::functions(), legacy::aggregates(), });
return r;
}
@@ -2844,6 +2573,32 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
});
}
future<utils::chunked_vector<canonical_mutation>> system_keyspace::get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts) {
// Default to timeout the repair task entries in 10 days, this should be enough time for the management tools to query
constexpr int ttl = 10 * 24 * 3600;
sstring req = format("INSERT INTO system.{} (task_uuid, operation, first_token, last_token, timestamp, table_uuid) VALUES (?, ?, ?, ?, ?, ?) USING TTL {}", REPAIR_TASKS, ttl);
auto muts = co_await _qp.get_mutations_internal(req, internal_system_query_state(), ts,
{entry.task_uuid.uuid(), repair_task_operation_to_string(entry.operation),
entry.first_token, entry.last_token, entry.timestamp, entry.table_uuid.uuid()});
utils::chunked_vector<canonical_mutation> cmuts = {muts.begin(), muts.end()};
co_return cmuts;
}
future<> system_keyspace::get_repair_task(tasks::task_id task_uuid, repair_task_consumer f) {
sstring req = format("SELECT * from system.{} WHERE task_uuid = {}", REPAIR_TASKS, task_uuid);
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
repair_task_entry ent;
ent.task_uuid = tasks::task_id(row.get_as<utils::UUID>("task_uuid"));
ent.operation = repair_task_operation_from_string(row.get_as<sstring>("operation"));
ent.first_token = row.get_as<int64_t>("first_token");
ent.last_token = row.get_as<int64_t>("last_token");
ent.timestamp = row.get_as<db_clock::time_point>("timestamp");
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
co_await f(std::move(ent));
co_return stop_iteration::no;
});
}
future<gms::generation_type> system_keyspace::increment_and_get_generation() {
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
@@ -4015,4 +3770,35 @@ future<> system_keyspace::apply_mutation(mutation m) {
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
}
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<system_keyspace::repair_task_operation, sstring> repair_task_operation_to_name = {
{system_keyspace::repair_task_operation::requested, "requested"},
{system_keyspace::repair_task_operation::finished, "finished"},
};
static const std::unordered_map<sstring, system_keyspace::repair_task_operation> repair_task_operation_from_name = std::invoke([] {
std::unordered_map<sstring, system_keyspace::repair_task_operation> result;
for (auto&& [v, s] : repair_task_operation_to_name) {
result.emplace(s, v);
}
return result;
});
sstring system_keyspace::repair_task_operation_to_string(system_keyspace::repair_task_operation op) {
auto i = repair_task_operation_to_name.find(op);
if (i == repair_task_operation_to_name.end()) {
on_internal_error(slogger, format("Invalid repair task operation: {}", static_cast<int>(op)));
}
return i->second;
}
system_keyspace::repair_task_operation system_keyspace::repair_task_operation_from_string(const sstring& name) {
return repair_task_operation_from_name.at(name);
}
} // namespace db
auto fmt::formatter<db::system_keyspace::repair_task_operation>::format(const db::system_keyspace::repair_task_operation& op, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", db::system_keyspace::repair_task_operation_to_string(op));
}

View File

@@ -57,6 +57,8 @@ namespace paxos {
struct topology_request_state;
class group0_guard;
class raft_group0_client;
}
namespace netw {
@@ -184,6 +186,7 @@ public:
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
static constexpr auto REPAIR_HISTORY = "repair_history";
static constexpr auto REPAIR_TASKS = "repair_tasks";
static constexpr auto GROUP0_HISTORY = "group0_history";
static constexpr auto DISCOVERY = "discovery";
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
@@ -241,28 +244,6 @@ public:
static schema_ptr cdc_local();
};
struct legacy {
static constexpr auto HINTS = "hints";
static constexpr auto BATCHLOG = "batchlog";
static constexpr auto KEYSPACES = "schema_keyspaces";
static constexpr auto COLUMNFAMILIES = "schema_columnfamilies";
static constexpr auto COLUMNS = "schema_columns";
static constexpr auto TRIGGERS = "schema_triggers";
static constexpr auto USERTYPES = "schema_usertypes";
static constexpr auto FUNCTIONS = "schema_functions";
static constexpr auto AGGREGATES = "schema_aggregates";
static schema_ptr keyspaces();
static schema_ptr column_families();
static schema_ptr columns();
static schema_ptr triggers();
static schema_ptr usertypes();
static schema_ptr functions();
static schema_ptr aggregates();
static schema_ptr hints();
static schema_ptr batchlog();
};
// Partition estimates for a given range of tokens.
struct range_estimates {
schema_ptr schema;
@@ -282,6 +263,7 @@ public:
static schema_ptr raft();
static schema_ptr raft_snapshots();
static schema_ptr repair_history();
static schema_ptr repair_tasks();
static schema_ptr group0_history();
static schema_ptr discovery();
static schema_ptr broadcast_kv_store();
@@ -420,6 +402,22 @@ public:
int64_t range_end;
};
enum class repair_task_operation {
requested,
finished,
};
static sstring repair_task_operation_to_string(repair_task_operation op);
static repair_task_operation repair_task_operation_from_string(const sstring& name);
struct repair_task_entry {
tasks::task_id task_uuid;
repair_task_operation operation;
int64_t first_token;
int64_t last_token;
db_clock::time_point timestamp;
table_id table_uuid;
};
struct topology_requests_entry {
utils::UUID id;
utils::UUID initiating_host;
@@ -441,6 +439,10 @@ public:
using repair_history_consumer = noncopyable_function<future<>(const repair_history_entry&)>;
future<> get_repair_history(table_id, repair_history_consumer f);
future<utils::chunked_vector<canonical_mutation>> get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts);
using repair_task_consumer = noncopyable_function<future<>(const repair_task_entry&)>;
future<> get_repair_task(tasks::task_id task_uuid, repair_task_consumer f);
future<> save_truncation_record(const replica::column_family&, db_clock::time_point truncated_at, db::replay_position);
future<replay_positions> get_truncated_positions(table_id);
future<> drop_truncation_rp_records();
@@ -748,3 +750,8 @@ public:
}; // class system_keyspace
} // namespace db
template <>
struct fmt::formatter<db::system_keyspace::repair_task_operation> : fmt::formatter<string_view> {
auto format(const db::system_keyspace::repair_task_operation&, fmt::format_context& ctx) const -> decltype(ctx.out());
};

View File

@@ -1 +1 @@
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --collector.ethtool.metrics-include='(bw_in_allowance_exceeded|bw_out_allowance_exceeded|conntrack_allowance_exceeded|conntrack_allowance_available|linklocal_allowance_exceeded)' --collector.ethtool --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"

View File

@@ -110,7 +110,6 @@ To display the log classes (output changes with each version so your display may
keys
keyspace_utils
large_data
legacy_schema_migrator
lister
load_balancer
load_broadcaster

View File

@@ -17,7 +17,7 @@ SYNOPSIS
[(-u <username> | --username <username>)] snapshot
[(-cf <table> | --column-family <table> | --table <table>)]
[(-kc <kclist> | --kc.list <kclist>)]
[(-sf | --skip-flush)] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
[(-sf | --skip-flush)] [--use-sstable-identifier] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
OPTIONS
.......
@@ -37,6 +37,8 @@ Parameter Descriptio
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
-sf / --skip-flush Do not flush memtables before snapshotting (snapshot will not contain unflushed data)
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
--use-sstable-identifier Use the sstable identifier UUID, if available, rather than the sstable generation.
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
-t <tag> / --tag <tag> The name of the snapshot
==================================================================== =====================================================================================

8
docs/poetry.lock generated
View File

@@ -1018,14 +1018,14 @@ sphinx-markdown-tables = "0.0.17"
[[package]]
name = "sphinx-scylladb-theme"
version = "1.8.9"
version = "1.8.10"
description = "A Sphinx Theme for ScyllaDB documentation projects"
optional = false
python-versions = "<4.0,>=3.10"
groups = ["main"]
files = [
{file = "sphinx_scylladb_theme-1.8.9-py3-none-any.whl", hash = "sha256:f8649a7753a29494fd2b417d1cb855035dddb9ebd498ea033fd73f5f9338271e"},
{file = "sphinx_scylladb_theme-1.8.9.tar.gz", hash = "sha256:ab7cda4c10a0d067c5c3a45f7b1f68cb8ebefe135a0be0738bfa282a344769b6"},
{file = "sphinx_scylladb_theme-1.8.10-py3-none-any.whl", hash = "sha256:8b930f33bec7308ccaa92698ebb5ad85059bcbf93a463f92917aeaf473fce632"},
{file = "sphinx_scylladb_theme-1.8.10.tar.gz", hash = "sha256:8a78a9b692d9a946be2c4a64aa472fd82204cc8ea0b1ee7f60de6db35b356326"},
]
[package.dependencies]
@@ -1603,4 +1603,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = "^3.10"
content-hash = "74912627a3f424290ed7889451c0bdb1a862ab85b1d07c85f4f3b8c34f32a020"
content-hash = "0ae673106f45d3465cbdabbf511e165ca44feadd34d7753f2e68093afaa95c79"

View File

@@ -9,7 +9,7 @@ package-mode = false
python = "^3.10"
pygments = "^2.18.0"
redirects_cli ="^0.1.3"
sphinx-scylladb-theme = "^1.8.9"
sphinx-scylladb-theme = "^1.8.10"
sphinx-sitemap = "^2.6.0"
sphinx-autobuild = "^2024.4.19"
Sphinx = "^7.3.7"

View File

@@ -143,6 +143,7 @@ public:
gms::feature tablet_incremental_repair { *this, "TABLET_INCREMENTAL_REPAIR"sv };
gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv };
gms::feature tablet_repair_tasks_table { *this, "TABLET_REPAIR_TASKS_TABLE"sv };
gms::feature tablet_merge { *this, "TABLET_MERGE"sv };
gms::feature tablet_rack_aware_view_pairing { *this, "TABLET_RACK_AWARE_VIEW_PAIRING"sv };

View File

@@ -129,6 +129,6 @@ struct direct_fd_ping_reply {
std::variant<std::monostate, service::wrong_destination, service::group_liveness_info> result;
};
verb [[with_client_info, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply;
verb [[with_client_info, with_timeout, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply;
} // namespace service

View File

@@ -39,7 +39,6 @@
#include "api/api_init.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "db/legacy_schema_migrator.hh"
#include "service/storage_service.hh"
#include "service/migration_manager.hh"
#include "service/tablet_allocator.hh"
@@ -1641,7 +1640,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
fd.start(
std::ref(fd_pinger), std::ref(fd_clock),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count(),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count()).get();
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count(), dbcfg.gossip_scheduling_group).get();
auto stop_fd = defer_verbose_shutdown("direct_failure_detector", [] {
fd.stop().get();
@@ -1851,8 +1850,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
group0_client.init().get();
checkpoint(stop_signal, "initializing system schema");
// schema migration, if needed, is also done on shard 0
db::legacy_schema_migrator::migrate(proxy, db, sys_ks, qp.local()).get();
db::schema_tables::save_system_schema(qp.local()).get();
db::schema_tables::recalculate_schema_version(sys_ks, proxy, feature_service.local()).get();

View File

@@ -686,6 +686,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_MODIFY_CONFIG:
case messaging_verb::RAFT_PULL_SNAPSHOT:
case messaging_verb::NOTIFY_BANNED:
case messaging_verb::DIRECT_FD_PING:
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
// only verbs which are 'rare' and 'cheap'.
@@ -747,7 +748,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::PAXOS_ACCEPT:
case messaging_verb::PAXOS_LEARN:
case messaging_verb::PAXOS_PRUNE:
case messaging_verb::DIRECT_FD_PING:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:

View File

@@ -575,10 +575,15 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
}
res.row.set_range_tombstone(cur.range_tombstone_for_row() + src_cur.range_tombstone());
if (need_preempt()) {
lb = position_in_partition(cur.position());
++tracker.get_stats().rows_covered_by_range_tombstones_from_memtable;
return stop_iteration::no;
}
// FIXME: Compact the row
++tracker.get_stats().rows_covered_by_range_tombstones_from_memtable;
cur.next();
// FIXME: preempt
}
}
{

View File

@@ -3844,3 +3844,83 @@ future<uint32_t> repair_service::get_next_repair_meta_id() {
locator::host_id repair_service::my_host_id() const noexcept {
return _gossiper.local().my_host_id();
}
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2) {
if (ranges1.empty() || ranges2.empty()) {
co_return 0;
}
auto sort = [] (utils::chunked_vector<tablet_token_range>& ranges) {
std::sort(ranges.begin(), ranges.end(), [] (const auto& a, const auto& b) {
if (a.first_token != b.first_token) {
return a.first_token < b.first_token;
}
return a.last_token < b.last_token;
});
};
// First, merge overlapping and adjacent ranges in ranges2.
sort(ranges2);
utils::chunked_vector<tablet_token_range> merged;
merged.push_back(ranges2[0]);
for (size_t i = 1; i < ranges2.size(); ++i) {
co_await coroutine::maybe_yield();
// To avoid overflow with max() + 1, we check adjacency with `a - 1 <= b` instead of `a <= b + 1`
if (ranges2[i].first_token - 1 <= merged.back().last_token) {
merged.back().last_token = std::max(merged.back().last_token, ranges2[i].last_token);
} else {
merged.push_back(ranges2[i]);
}
}
// Count covered ranges using a linear scan
size_t covered_count = 0;
auto it = merged.begin();
auto end = merged.end();
sort(ranges1);
for (const auto& r1 : ranges1) {
co_await coroutine::maybe_yield();
// Advance the merged iterator only if the current merged range ends
// before the current r1 starts.
while (it != end && it->last_token < r1.first_token) {
co_await coroutine::maybe_yield();
++it;
}
// If we have exhausted the merged ranges, no further r1 can be covered
if (it == end) {
break;
}
// Check if the current merged range covers r1.
if (it->first_token <= r1.first_token && r1.last_token <= it->last_token) {
covered_count++;
}
}
co_return covered_count;
}
future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_task_progress(tasks::task_id task_uuid) {
utils::chunked_vector<tablet_token_range> requested_tablets;
utils::chunked_vector<tablet_token_range> finished_tablets;
table_id tid;
if (!_db.local().features().tablet_repair_tasks_table) {
co_return std::nullopt;
}
co_await _sys_ks.local().get_repair_task(task_uuid, [&tid, &requested_tablets, &finished_tablets] (const db::system_keyspace::repair_task_entry& entry) -> future<> {
rlogger.debug("repair_task_progress: Get entry operation={} first_token={} last_token={}", entry.operation, entry.first_token, entry.last_token);
if (entry.operation == db::system_keyspace::repair_task_operation::requested) {
requested_tablets.push_back({entry.first_token, entry.last_token});
} else if (entry.operation == db::system_keyspace::repair_task_operation::finished) {
finished_tablets.push_back({entry.first_token, entry.last_token});
}
tid = entry.table_uuid;
co_return;
});
auto requested = requested_tablets.size();
auto finished_nomerge = finished_tablets.size();
auto finished = co_await count_finished_tablets(std::move(requested_tablets), std::move(finished_tablets));
auto progress = repair_task_progress{requested, finished, tid};
rlogger.debug("repair_task_progress: task_uuid={} table_uuid={} requested_tablets={} finished_tablets={} progress={} finished_nomerge={}",
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
co_return progress;
}

View File

@@ -99,6 +99,15 @@ public:
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
struct repair_task_progress {
size_t requested;
size_t finished;
table_id table_uuid;
float progress() const {
return requested == 0 ? 1.0 : float(finished) / requested;
}
};
class repair_service : public seastar::peering_sharded_service<repair_service> {
sharded<service::topology_state_machine>& _tsm;
sharded<gms::gossiper>& _gossiper;
@@ -222,6 +231,9 @@ private:
public:
future<gc_clock::time_point> repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid, tasks::task_info global_tablet_repair_task_info, service::frozen_topology_guard topo_guard, std::optional<locator::tablet_replica_set> rebuild_replicas, locator::tablet_transition_stage stage);
future<std::optional<repair_task_progress>> get_tablet_repair_task_progress(tasks::task_id task_uuid);
private:
future<repair_update_system_table_response> repair_update_system_table_handler(
@@ -326,3 +338,12 @@ future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,
schema_ptr s, uint64_t seed, repair_master is_master,
reader_permit permit, repair_hasher hasher);
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, std::optional<small_table_optimization_params> small_table_optimization = std::nullopt, repair_meta* rm = nullptr);
// A struct to hold the first and last token of a tablet.
struct tablet_token_range {
int64_t first_token;
int64_t last_token;
};
// Function to count the number of ranges in ranges1 covered by the merged ranges of ranges2.
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2);

View File

@@ -2810,26 +2810,26 @@ future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& shar
});
}
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, bool skip_flush) {
if (!skip_flush) {
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, db::snapshot_options opts) {
if (!opts.skip_flush) {
co_await flush_table_on_all_shards(sharded_db, uuid);
}
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag, opts);
}
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush) {
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), skip_flush] (auto& table_name) {
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts) {
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), opts] (auto& table_name) {
auto uuid = sharded_db.local().find_uuid(ks_name, table_name);
return snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
return snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
});
}
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush) {
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts) {
auto& ks = sharded_db.local().find_keyspace(ks_name);
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), skip_flush] (const auto& pair) -> future<> {
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), opts] (const auto& pair) -> future<> {
auto uuid = pair.second->id();
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
});
}
@@ -2951,7 +2951,12 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
auto truncated_at = truncated_at_opt.value_or(db_clock::now());
auto name = snapshot_name_opt.value_or(
format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name()));
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name);
// Use the sstable identifier in snapshot names to allow de-duplication of sstables
// at backup time even if they were migrated across shards or nodes and were renamed a given a new generation.
// We hard-code that here since we have no way to pass this option to auto-snapshot and
// it is always safe to use the sstable identifier for the sstable generation.
auto opts = db::snapshot_options{.use_sstable_identifier = true};
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name, opts);
}
co_await sharded_db.invoke_on_all([&] (database& db) {

View File

@@ -1040,12 +1040,12 @@ public:
private:
using snapshot_file_set = foreign_ptr<std::unique_ptr<std::unordered_set<sstring>>>;
future<snapshot_file_set> take_snapshot(sstring jsondir);
future<snapshot_file_set> take_snapshot(sstring jsondir, db::snapshot_options opts);
// Writes the table schema and the manifest of all files in the snapshot directory.
future<> finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector<snapshot_file_set> file_sets);
static future<> seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets);
public:
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name);
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts);
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
static future<snapshot_details> get_snapshot_details(std::filesystem::path snapshot_dir, std::filesystem::path datadir);
@@ -2009,9 +2009,9 @@ public:
static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, bool skip_flush);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush);
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, db::snapshot_options opts);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts);
public:
bool update_column_family(schema_ptr s);

View File

@@ -3268,7 +3268,7 @@ future<> table::write_schema_as_cql(const global_table_ptr& table_shards, sstrin
}
// Runs the orchestration code on an arbitrary shard to balance the load.
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name) {
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts) {
auto* so = std::get_if<storage_options::local>(&table_shards->get_storage_options().value);
if (so == nullptr) {
throw std::runtime_error("Snapshotting non-local tables is not implemented");
@@ -3291,7 +3291,7 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> {
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
return table_shards->take_snapshot(jsondir);
return table_shards->take_snapshot(jsondir, opts);
}));
});
co_await io_check(sync_directory, jsondir);
@@ -3300,19 +3300,22 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
});
}
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir) {
tlogger.trace("take_snapshot {}", jsondir);
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir, db::snapshot_options opts) {
tlogger.trace("take_snapshot {}: use_sstable_identifier={}", jsondir, opts.use_sstable_identifier);
auto sstable_deletion_guard = co_await get_sstable_list_permit();
auto tables = *_sstables->all() | std::ranges::to<std::vector<sstables::shared_sstable>>();
auto table_names = std::make_unique<std::unordered_set<sstring>>();
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) {
table_names->insert(sstable->component_basename(sstables::component_type::Data));
return io_check([sstable, &dir = jsondir] {
return sstable->snapshot(dir);
auto& ks_name = schema()->ks_name();
auto& cf_name = schema()->cf_name();
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&, opts] (sstables::shared_sstable sstable) -> future<> {
auto gen = co_await io_check([sstable, &dir = jsondir, opts] {
return sstable->snapshot(dir, opts.use_sstable_identifier);
});
auto fname = sstable->component_basename(ks_name, cf_name, sstable->get_version(), gen, sstable->get_format(), sstables::component_type::Data);
table_names->insert(fname);
});
co_return make_foreign(std::move(table_names));
}

View File

@@ -267,16 +267,246 @@ class intrusive_set:
class compact_radix_tree:
"""Wrapper around compact_radix_tree::tree for GDB debugging.
Provides iteration and indexing by key (typically column_id) similar to std_map.
The tree stores key-value pairs where keys are unsigned integers.
Example usage:
tree = compact_radix_tree(row['_cells'])
# Iterate over elements
for key, value in tree:
print(f"Column {key}: {value}")
# Access by key
cell = tree[column_id]
# Check if key exists
cell = tree.get(column_id, default=None)
# Get all keys
column_ids = tree.keys()
Note: Due to GDB limitations and compiler optimizations, full tree traversal
is challenging. The implementation provides the std_map-like API but may not
be able to extract all elements in optimized builds. In such cases, consider:
- Using debug builds (-g -O0) for better introspection
- Examining the tree structure directly with GDB commands
- Using the C++ tree printer (compact_radix_tree::printer) in test code
"""
def __init__(self, ref):
"""Initialize from a gdb.Value representing a compact_radix_tree::tree instance."""
self.ref = ref
self.root = ref['_root']['_v']
# Get template arguments to determine key and value types
tree_type = ref.type.strip_typedefs()
self.value_type = tree_type.template_argument(0)
# Index type defaults to unsigned int if not specified
try:
self.key_type = tree_type.template_argument(1)
except RuntimeError:
self.key_type = gdb.lookup_type('unsigned int')
# Cache for elements collected during traversal
self._elements = None
# Constants from compact-radix-tree.hh
# enum class layout : uint8_t { nil, indirect_tiny, indirect_small, ...}
self.LAYOUT_NIL = 0
self.RADIX_BITS = 7
self.RADIX_MASK = (1 << self.RADIX_BITS) - 1
def is_empty(self):
"""Check if the tree is empty."""
try:
layout = int(self.root['_base_layout'])
return layout == self.LAYOUT_NIL
except (gdb.error, gdb.MemoryError):
return True
def _collect_elements(self):
"""Collect all elements from the tree by traversing its structure.
Returns a list of (key, value) tuples sorted by key.
This is cached after first call.
"""
if self._elements is not None:
return self._elements
self._elements = []
if self.is_empty():
return self._elements
try:
# Traverse the tree structure
# The tree is a radix tree with nodes that can be inner or leaf nodes
# We'll do a depth-first traversal
self._visit_node(self.root, 0, 0)
# Sort by key to ensure correct ordering
self._elements.sort(key=lambda x: x[0])
except (gdb.error, gdb.MemoryError) as e:
# If traversal fails, we have at least collected what we could
gdb.write(f"Warning: Failed to fully traverse compact_radix_tree: {e}\n")
return self._elements
def _visit_node(self, node, depth, prefix):
"""Recursively visit a node and collect elements.
Args:
node: The node_head to visit
depth: Current depth in the tree
prefix: Key prefix accumulated from parent nodes
"""
try:
# Get node properties
node_prefix = int(node['_prefix'])
node_size = int(node['_size'])
layout = int(node['_base_layout'])
if node_size == 0 or layout == self.LAYOUT_NIL:
return
# Calculate the key size in bits
# For uint32_t (column_id), this would be 32 bits
key_bits = self.key_type.sizeof * 8
# Calculate leaf depth: the tree uses RADIX_BITS (7) bits per level
# leaf_depth = ceil(key_bits / RADIX_BITS) - 1
# The -1 accounts for the root level not being counted in depth
leaf_depth = (key_bits + self.RADIX_BITS - 1) // self.RADIX_BITS - 1
# Extract prefix information from node_prefix
# Prefix encoding: lower RADIX_BITS contain the prefix length,
# upper bits contain the actual prefix value
prefix_len = node_prefix & self.RADIX_MASK # Extract lower 7 bits for length
prefix_value = node_prefix & ~self.RADIX_MASK # Extract upper bits for value
# Update prefix with node's contribution
current_prefix = prefix | prefix_value
# Check if this is a leaf node (at maximum depth)
if depth + prefix_len >= leaf_depth:
# This is a leaf node - try to extract values
self._collect_leaf_elements(node, current_prefix)
else:
# This is an inner node - recurse into children
# Inner nodes contain pointers to other nodes
# The structure is complex and varies by layout type
# For now, we'll use a best-effort approach
pass
except (gdb.error, gdb.MemoryError, ValueError) as e:
# Skip nodes that can't be accessed
pass
def _collect_leaf_elements(self, leaf_node, prefix):
"""Collect elements from a leaf node.
Args:
leaf_node: The leaf node_head
prefix: Key prefix for elements in this leaf
"""
try:
# Leaf nodes store the actual values
# The exact structure depends on the layout type
# Since the compiler may optimize away structure details,
# we use a heuristic approach
# For now, we acknowledge that without full tree traversal support,
# we can't reliably extract all elements
# This would require implementing the full tree traversal logic
# which is complex given GDB's limitations
pass
except (gdb.error, gdb.MemoryError):
pass
def __len__(self):
"""Return the number of elements in the tree."""
elements = self._collect_elements()
return len(elements)
def __iter__(self):
"""Iterate over (key, value) pairs in the tree in ascending key order.
Yields:
Tuples of (key, value) where key is the integer index and value is the stored element.
"""
elements = self._collect_elements()
for key, value in elements:
yield (key, value)
def __getitem__(self, key):
"""Get value at given key (column_id).
Args:
key: Integer key (column_id) to look up
Returns:
The value at the given key
Raises:
KeyError: If key not found in tree
"""
elements = self._collect_elements()
for k, v in elements:
if k == key:
return v
raise KeyError(f"Key {key} not found in compact_radix_tree")
def get(self, key, default=None):
"""Get value at given key, or default if not found.
Args:
key: Integer key to look up
default: Value to return if key not found
Returns:
The value at the given key, or default if not found
"""
try:
return self[key]
except KeyError:
return default
def keys(self):
"""Return a list of all keys in the tree."""
elements = self._collect_elements()
return [k for k, v in elements]
def values(self):
"""Return a list of all values in the tree."""
elements = self._collect_elements()
return [v for k, v in elements]
def items(self):
"""Return a list of (key, value) tuples."""
return list(self._collect_elements())
def to_string(self):
if self.root['_base_layout'] == 0:
"""Return a string representation for printing."""
if self.is_empty():
return '<empty>'
# Compiler optimizes-away lots of critical stuff, so
# for now just show where the tree is
return 'compact radix tree @ 0x%x' % self.root
# Try to provide more useful information
try:
elements = self._collect_elements()
if elements:
keys = [k for k, v in elements]
return f'compact_radix_tree with {len(elements)} element(s), keys: {keys}'
else:
# We know it's not empty but couldn't collect elements
# This happens when compiler optimizations prevent tree traversal
try:
size = int(self.root['_size'])
layout = int(self.root['_base_layout'])
return f'compact_radix_tree with size={size}, layout={layout} @ {hex(int(self.root.address))} (elements not accessible, use debug build for full introspection)'
except (gdb.error, gdb.MemoryError, ValueError, AttributeError):
return f'compact_radix_tree @ {hex(int(self.root.address))} (structure not fully accessible)'
except (gdb.error, gdb.MemoryError, ValueError, AttributeError) as e:
# Fallback to simple representation
return f'compact_radix_tree @ {hex(int(self.root.address))} (error: {e})'
class intrusive_btree:

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "seastar/core/scheduling.hh"
#include "utils/assert.hh"
#include <unordered_set>
@@ -17,6 +18,7 @@
#include <seastar/core/condition-variable.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/switch_to.hh>
#include "utils/log.hh"
@@ -118,7 +120,7 @@ struct failure_detector::impl {
// Fetches endpoint updates from _endpoint_queue and performs the add/remove operation.
// Runs on shard 0 only.
future<> update_endpoint_fiber();
future<> update_endpoint_fiber(seastar::scheduling_group sg);
future<> _update_endpoint_fiber = make_ready_future<>();
// Workers running on this shard.
@@ -140,7 +142,7 @@ struct failure_detector::impl {
// The unregistering process requires cross-shard operations which we perform on this fiber.
future<> _destroy_subscriptions = make_ready_future<>();
impl(failure_detector& parent, pinger&, clock&, clock::interval_t ping_period, clock::interval_t ping_timeout);
impl(failure_detector& parent, pinger&, clock&, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg);
~impl();
// Inform update_endpoint_fiber() about an added/removed endpoint.
@@ -177,19 +179,19 @@ struct failure_detector::impl {
};
failure_detector::failure_detector(
pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout)
: _impl(std::make_unique<impl>(*this, pinger, clock, ping_period, ping_timeout))
pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg)
: _impl(std::make_unique<impl>(*this, pinger, clock, ping_period, ping_timeout, sg))
{}
failure_detector::impl::impl(
failure_detector& parent, pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout)
failure_detector& parent, pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg)
: _parent(parent), _pinger(pinger), _clock(clock), _ping_period(ping_period), _ping_timeout(ping_timeout) {
if (this_shard_id() != 0) {
return;
}
_num_workers.resize(smp::count, 0);
_update_endpoint_fiber = update_endpoint_fiber();
_update_endpoint_fiber = update_endpoint_fiber(sg);
}
void failure_detector::impl::send_update_endpoint(pinger::endpoint_id ep, endpoint_update update) {
@@ -205,9 +207,9 @@ void failure_detector::impl::send_update_endpoint(pinger::endpoint_id ep, endpoi
_endpoint_changed.signal();
}
future<> failure_detector::impl::update_endpoint_fiber() {
future<> failure_detector::impl::update_endpoint_fiber(seastar::scheduling_group sg) {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await coroutine::switch_to(sg);
while (true) {
co_await _endpoint_changed.wait([this] { return !_endpoint_updates.empty(); });
@@ -480,7 +482,7 @@ static future<bool> ping_with_timeout(pinger::endpoint_id id, clock::timepoint_t
}
});
auto f = pinger.ping(id, timeout_as);
auto f = pinger.ping(id, timeout, timeout_as, c);
auto sleep_and_abort = [] (clock::timepoint_t timeout, abort_source& timeout_as, clock& c) -> future<> {
co_await c.sleep_until(timeout, timeout_as).then_wrapped([&timeout_as] (auto&& f) {
// Avoid throwing if sleep was aborted.

View File

@@ -19,26 +19,6 @@ class abort_source;
namespace direct_failure_detector {
class pinger {
public:
// Opaque endpoint ID.
// A specific implementation of `pinger` maps those IDs to 'real' addresses.
using endpoint_id = utils::UUID;
// Send a message to `ep` and wait until it responds.
// The wait can be aborted using `as`.
// Abort should be signalized with `abort_requested_exception`.
//
// If the ping fails in an expected way (e.g. the endpoint is down and refuses to connect),
// returns `false`. If it succeeds, returns `true`.
virtual future<bool> ping(endpoint_id ep, abort_source& as) = 0;
protected:
// The `pinger` object must not be destroyed through the `pinger` interface.
// `failure_detector` does not take ownership of `pinger`, only a non-owning reference.
~pinger() = default;
};
// A clock that uses abstract units to measure time.
// The implementation is responsible for periodically advancing the clock.
//
@@ -60,12 +40,33 @@ public:
// Aborts should be signalized using `seastar::sleep_aborted`.
virtual future<> sleep_until(timepoint_t tp, abort_source& as) = 0;
virtual std::chrono::milliseconds to_milliseconds(timepoint_t tp) const = 0;
protected:
// The `clock` object must not be destroyed through the `clock` interface.
// `failure_detector` does not take ownership of `clock`, only a non-owning reference.
~clock() = default;
};
class pinger {
public:
// Opaque endpoint ID.
// A specific implementation of `pinger` maps those IDs to 'real' addresses.
using endpoint_id = utils::UUID;
// Send a message to `ep` and wait until it responds.
// The wait can be aborted using `as`.
// Abort should be signalized with `abort_requested_exception`.
//
// If the ping fails in an expected way (e.g. the endpoint is down and refuses to connect),
// returns `false`. If it succeeds, returns `true`.
virtual future<bool> ping(endpoint_id ep, clock::timepoint_t timeout, abort_source& as, clock& c) = 0;
protected:
// The `pinger` object must not be destroyed through the `pinger` interface.
// `failure_detector` does not take ownership of `pinger`, only a non-owning reference.
~pinger() = default;
};
class listener {
public:
// Called when an endpoint in the detected set (added by `failure_detector::add_endpoint`) responds to a ping
@@ -127,7 +128,10 @@ public:
// Duration after which a ping is aborted, so that next ping can be started
// (pings are sent sequentially).
clock::interval_t ping_timeout
clock::interval_t ping_timeout,
// Scheduling group used for fibers inside the failure detector.
seastar::scheduling_group sg
);
~failure_detector();

View File

@@ -18,6 +18,7 @@
#include "utils/error_injection.hh"
#include "seastar/core/shared_future.hh"
#include <chrono>
#include <seastar/core/coroutine.hh>
#include <seastar/core/when_all.hh>
#include <seastar/core/sleep.hh>
@@ -202,8 +203,11 @@ void raft_group_registry::init_rpc_verbs() {
});
ser::raft_rpc_verbs::register_direct_fd_ping(&_ms,
[this] (const rpc::client_info&, raft::server_id dst) -> future<direct_fd_ping_reply> {
// XXX: update address map here as well?
[this] (const rpc::client_info&, rpc::opt_time_point timeout, raft::server_id dst) -> future<direct_fd_ping_reply> {
if (timeout && *timeout <= netw::messaging_service::clock_type::now()) {
throw timed_out_error{};
}
if (_my_id != dst) {
return make_ready_future<direct_fd_ping_reply>(direct_fd_ping_reply {
@@ -213,19 +217,10 @@ void raft_group_registry::init_rpc_verbs() {
});
}
return container().invoke_on(0, [] (raft_group_registry& me) -> future<direct_fd_ping_reply> {
bool group0_alive = false;
if (me._group0_id) {
auto* group0_server = me.find_server(*me._group0_id);
if (group0_server && group0_server->is_alive()) {
group0_alive = true;
}
return make_ready_future<direct_fd_ping_reply>(direct_fd_ping_reply {
.result = service::group_liveness_info{
.group0_alive = _group0_is_alive,
}
co_return direct_fd_ping_reply {
.result = service::group_liveness_info{
.group0_alive = group0_alive,
}
};
});
});
}
@@ -380,6 +375,12 @@ future<> raft_group_registry::start_server_for_group(raft_server_for_group new_g
co_await server.abort();
std::rethrow_exception(ex);
}
if (gid == _group0_id) {
co_await container().invoke_on_all([] (raft_group_registry& rg) {
rg._group0_is_alive = true;
});
}
}
future<> raft_group_registry::abort_server(raft::group_id gid, sstring reason) {
@@ -389,14 +390,18 @@ future<> raft_group_registry::abort_server(raft::group_id gid, sstring reason) {
if (const auto it = _servers.find(gid); it != _servers.end()) {
auto& [gid, s] = *it;
if (!s.aborted) {
if (gid == _group0_id) {
co_await container().invoke_on_all([] (raft_group_registry& rg) {
rg._group0_is_alive = false;
});
}
s.aborted = s.server->abort(std::move(reason))
.handle_exception([gid] (std::exception_ptr ex) {
rslog.warn("Failed to abort raft group server {}: {}", gid, ex);
});
}
return s.aborted->get_future();
co_await s.aborted->get_future();
}
return make_ready_future<>();
}
unsigned raft_group_registry::shard_for_group(const raft::group_id& gid) const {
@@ -517,11 +522,13 @@ future<> raft_server_with_timeouts::read_barrier(seastar::abort_source* as, std:
}, "read_barrier", as, timeout);
}
future<bool> direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) {
future<bool> direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, direct_failure_detector::clock::timepoint_t timeout, abort_source& as, direct_failure_detector::clock& c) {
auto dst_id = raft::server_id{id};
try {
auto reply = co_await ser::raft_rpc_verbs::send_direct_fd_ping(&_ms, locator::host_id{id}, as, dst_id);
std::chrono::milliseconds timeout_ms = c.to_milliseconds(timeout);
netw::messaging_service::clock_type::time_point deadline = netw::messaging_service::clock_type::now() + timeout_ms;
auto reply = co_await ser::raft_rpc_verbs::send_direct_fd_ping(&_ms, locator::host_id{id}, deadline, as, dst_id);
if (auto* wrong_dst = std::get_if<wrong_destination>(&reply.result)) {
// FIXME: after moving to host_id based verbs we will not get `wrong_destination`
// any more since the connection will fail
@@ -554,4 +561,11 @@ future<> direct_fd_clock::sleep_until(direct_failure_detector::clock::timepoint_
return sleep_abortable(t - n, as);
}
std::chrono::milliseconds direct_fd_clock::to_milliseconds(direct_failure_detector::clock::timepoint_t tp) const {
auto t = base::time_point{base::duration{tp}};
auto n = base::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(t - n);
}
} // end of namespace service

View File

@@ -127,6 +127,7 @@ private:
// My Raft ID. Shared between different Raft groups.
raft::server_id _my_id;
bool _group0_is_alive = false;
public:
raft_group_registry(raft::server_id my_id, netw::messaging_service& ms,
direct_failure_detector::failure_detector& fd);
@@ -181,6 +182,9 @@ public:
unsigned shard_for_group(const raft::group_id& gid) const;
shared_ptr<raft::failure_detector> failure_detector();
direct_failure_detector::failure_detector& direct_fd() { return _direct_fd; }
bool is_group0_alive() const {
return _group0_is_alive;
}
};
// Implementation of `direct_failure_detector::pinger` which uses DIRECT_FD_PING verb for pinging.
@@ -198,7 +202,7 @@ public:
direct_fd_pinger(const direct_fd_pinger&) = delete;
direct_fd_pinger(direct_fd_pinger&&) = delete;
future<bool> ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) override;
future<bool> ping(direct_failure_detector::pinger::endpoint_id id, direct_failure_detector::clock::timepoint_t timeout, abort_source& as, direct_failure_detector::clock& c) override;
};
// XXX: find a better place to put this?
@@ -207,6 +211,7 @@ struct direct_fd_clock : public direct_failure_detector::clock {
direct_failure_detector::clock::timepoint_t now() noexcept override;
future<> sleep_until(direct_failure_detector::clock::timepoint_t tp, abort_source& as) override;
std::chrono::milliseconds to_milliseconds(direct_failure_detector::clock::timepoint_t tp) const override;
};
} // end of namespace service

View File

@@ -6688,10 +6688,11 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
}
};
auto request = seastar::make_shared<read_cas_request>();
auto request = std::make_unique<read_cas_request>();
auto* request_ptr = request.get();
return cas(std::move(s), std::move(cas_shard), request, cmd, std::move(partition_ranges), std::move(query_options),
cl, db::consistency_level::ANY, timeout, cas_timeout, false).then([request] (bool is_applied) mutable {
return cas(std::move(s), std::move(cas_shard), *request_ptr, cmd, std::move(partition_ranges), std::move(query_options),
cl, db::consistency_level::ANY, timeout, cas_timeout, false).then([request = std::move(request)] (bool is_applied) mutable {
return make_ready_future<coordinator_query_result>(std::move(request->res));
});
}
@@ -6754,11 +6755,13 @@ static mutation_write_failure_exception read_failure_to_write(read_failure_excep
* NOTE: `cmd` argument can be nullptr, in which case it's guaranteed that this function would not perform
* any reads of committed values (in case user of the function is not interested in them).
*
* NOTE: The `request` object must be guaranteed to be alive until the returned future is resolved.
*
* WARNING: the function must be called on a shard that owns the key cas() operates on.
* The cas_shard must be created *before* selecting the shard, to protect against
* concurrent tablet migrations.
*/
future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, shared_ptr<cas_request> request, lw_shared_ptr<query::read_command> cmd,
future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, cas_request& request, lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector partition_ranges, storage_proxy::coordinator_query_options query_options,
db::consistency_level cl_for_paxos, db::consistency_level cl_for_learn,
clock_type::time_point write_timeout, clock_type::time_point cas_timeout, bool write, cdc::per_request_options cdc_opts) {
@@ -6859,7 +6862,7 @@ future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, shared_p
qr = std::move(cqr.query_result);
}
auto mutation = request->apply(std::move(qr), cmd->slice, utils::UUID_gen::micros_timestamp(ballot), cdc_opts);
auto mutation = request.apply(std::move(qr), cmd->slice, utils::UUID_gen::micros_timestamp(ballot), cdc_opts);
condition_met = true;
if (!mutation) {
if (write) {

View File

@@ -829,7 +829,7 @@ public:
clock_type::time_point timeout,
tracing::trace_state_ptr trace_state = nullptr);
future<bool> cas(schema_ptr schema, cas_shard cas_shard, shared_ptr<cas_request> request, lw_shared_ptr<query::read_command> cmd,
future<bool> cas(schema_ptr schema, cas_shard cas_shard, cas_request& request, lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector partition_ranges, coordinator_query_options query_options,
db::consistency_level cl_for_paxos, db::consistency_level cl_for_learn,
clock_type::time_point write_timeout, clock_type::time_point cas_timeout, bool write = true, cdc::per_request_options cdc_opts = {});

View File

@@ -6822,6 +6822,7 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
});
}
auto ts = db_clock::now();
for (const auto& token : tokens) {
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
@@ -6835,6 +6836,20 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
.set_repair_task_info(last_token, repair_task_info, _feature_service)
.build());
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::requested,
.first_token = dht::token::to_int64(tmap.get_first_token(tid)),
.last_token = dht::token::to_int64(tmap.get_last_token(tid)),
.timestamp = ts,
.table_uuid = table,
};
if (_feature_service.tablet_repair_tasks_table) {
auto cmuts = co_await _sys_ks.local().get_update_repair_task_mutations(entry, guard.write_timestamp());
for (auto& m : cmuts) {
updates.push_back(std::move(m));
}
}
}
sstring reason = format("Repair tablet by API request tokens={} tablet_task_id={}", tokens, repair_task_info.tablet_task_id);

View File

@@ -136,6 +136,17 @@ db::tablet_options combine_tablet_options(R&& opts) {
return combined_opts;
}
static std::unordered_set<locator::tablet_id> split_string_to_tablet_id(std::string_view s, char delimiter) {
auto tokens_view = s | std::views::split(delimiter)
| std::views::transform([](auto&& range) {
return std::string_view(&*range.begin(), std::ranges::distance(range));
})
| std::views::transform([](std::string_view sv) {
return locator::tablet_id(std::stoul(std::string(sv)));
});
return std::unordered_set<locator::tablet_id>{tokens_view.begin(), tokens_view.end()};
}
// Used to compare different migration choices in regard to impact on load imbalance.
// There is a total order on migration_badness such that better migrations are ordered before worse ones.
struct migration_badness {
@@ -893,6 +904,8 @@ public:
co_await coroutine::maybe_yield();
auto& config = tmap.repair_scheduler_config();
auto now = db_clock::now();
auto skip = utils::get_local_injector().inject_parameter<std::string_view>("tablet_repair_skip_sched");
auto skip_tablets = skip ? split_string_to_tablet_id(*skip, ',') : std::unordered_set<locator::tablet_id>();
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
auto gid = locator::global_tablet_id{table, id};
// Skip tablet that is in transitions.
@@ -913,6 +926,11 @@ public:
co_return;
}
if (skip_tablets.contains(id)) {
lblogger.debug("Skipped tablet repair for tablet={} by error injector", gid);
co_return;
}
// Avoid rescheduling a failed tablet repair in a loop
// TODO: Allow user to config
const auto min_reschedule_time = std::chrono::seconds(5);

View File

@@ -10,6 +10,7 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "service/storage_service.hh"
#include "repair/row_level.hh"
#include "service/task_manager_module.hh"
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
@@ -109,6 +110,16 @@ future<std::optional<tasks::virtual_task_hint>> tablet_virtual_task::contains(ta
tid = tmap.next_tablet(*tid);
}
}
// Check if the task id is present in the repair task table
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(task_id);
if (progress && progress->requested > 0) {
co_return tasks::virtual_task_hint{
.table_id = progress->table_uuid,
.task_type = locator::tablet_task_type::user_repair,
.tablet_id = std::nullopt,
};
}
co_return std::nullopt;
}
@@ -243,7 +254,20 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
size_t sched_nr = 0;
auto tmptr = _ss.get_token_metadata_ptr();
auto& tmap = tmptr->tablets().get_tablet_map(table);
bool repair_task_finished = false;
bool repair_task_pending = false;
if (is_repair_task(task_type)) {
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(id);
if (progress) {
res.status.progress.completed = progress->finished;
res.status.progress.total = progress->requested;
res.status.progress_units = "tablets";
if (progress->requested > 0 && progress->requested == progress->finished) {
repair_task_finished = true;
} if (progress->requested > 0 && progress->requested > progress->finished) {
repair_task_pending = true;
}
}
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) {
auto& task_info = info.repair_task_info;
if (task_info.tablet_task_id.uuid() == id.uuid()) {
@@ -275,7 +299,17 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
res.status.state = sched_nr == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running;
co_return res;
}
// FIXME: Show finished tasks.
if (repair_task_pending) {
// When repair_task_pending is true, the res.tablets will be empty iff the request is aborted by user.
res.status.state = res.tablets.empty() ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::running;
co_return res;
}
if (repair_task_finished) {
res.status.state = tasks::task_manager::task_state::done;
co_return res;
}
co_return std::nullopt;
}

View File

@@ -1205,6 +1205,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
// Record the repair_time returned by the repair_tablet rpc call
db_clock::time_point repair_time;
// Record the repair task update muations
utils::chunked_vector<canonical_mutation> repair_task_updates;
service::session_id session_id;
};
@@ -1737,6 +1739,14 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
dst = dst_opt.value().host;
}
// Update repair task
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(tinfo.repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::finished,
.first_token = dht::token::to_int64(tmap.get_first_token(gid.tablet)),
.last_token = dht::token::to_int64(tmap.get_last_token(gid.tablet)),
.table_uuid = gid.table,
};
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
service::session_id::create_random_id() : trinfo->session_id;
@@ -1745,6 +1755,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
auto& tablet_state = _tablets[tablet];
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
if (_feature_service.tablet_repair_tasks_table) {
entry.timestamp = db_clock::now();
tablet_state.repair_task_updates = co_await _sys_ks.get_update_repair_task_mutations(entry, api::new_timestamp());
}
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={}",
dst, tablet, duration, res.repair_time);
})) {
@@ -1763,6 +1777,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
.del_repair_task_info(last_token, _feature_service)
.del_session(last_token);
for (auto& m : tablet_state.repair_task_updates) {
updates.push_back(std::move(m));
}
// Skip update repair time in case hosts filter or dcs filter is set.
if (valid && is_filter_off) {
auto sched_time = tinfo.repair_task_info.sched_time;

View File

@@ -57,7 +57,10 @@ public:
index_list indexes;
index_consumer(logalloc::region& r, schema_ptr s)
: _s(std::move(s))
: _s(s)
, _alloc_section(abstract_formatter([s] (fmt::format_context& ctx) {
fmt::format_to(ctx.out(), "index_consumer {}.{}", s->ks_name(), s->cf_name());
}))
, _region(r)
{ }
@@ -785,6 +788,9 @@ public:
_sstable->manager().get_cache_tracker().region(),
_sstable->manager().get_cache_tracker().get_partition_index_cache_stats()))
, _index_cache(caching ? *_sstable->_index_cache : *_local_index_cache)
, _alloc_section(abstract_formatter([sst = _sstable] (fmt::format_context& ctx) {
fmt::format_to(ctx.out(), "index_reader {}", sst->get_filename());
}))
, _region(_sstable->manager().get_cache_tracker().region())
, _use_caching(caching)
, _single_page_read(single_partition_read) // all entries for a given partition are within a single page

View File

@@ -284,6 +284,9 @@ public:
, _clustering_parser(s, permit, _ctr.clustering_column_value_fix_legths(), true)
, _block_parser(s, permit, _ctr.clustering_column_value_fix_legths())
, _permit(std::move(permit))
, _as(abstract_formatter([s] (fmt::format_context& ctx) {
fmt::format_to(ctx.out(), "cached_promoted_index {}.{}", s.ks_name(), s.cf_name());
}))
{ }
~cached_promoted_index() {

View File

@@ -2117,11 +2117,14 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
}
sstable_id sid;
if (generation().is_uuid_based()) {
// Force a random sstable_id for testing purposes
bool random_sstable_identifier = utils::get_local_injector().is_enabled("random_sstable_identifier");
if (!random_sstable_identifier && generation().is_uuid_based()) {
sid = sstable_id(generation().as_uuid());
} else {
sid = sstable_id(utils::UUID_gen::get_time_UUID());
sstlog.info("SSTable {} has numerical generation. SSTable identifier in scylla_metadata set to {}", get_filename(), sid);
auto msg = random_sstable_identifier ? "forced random sstable_id" : "has numerical generation";
sstlog.info("SSTable {} {}. SSTable identifier in scylla_metadata set to {}", get_filename(), msg, sid);
}
_components->scylla_metadata->data.set<scylla_metadata_type::SSTableIdentifier>(scylla_metadata::sstable_identifier{sid});
@@ -2540,8 +2543,11 @@ std::vector<std::pair<component_type, sstring>> sstable::all_components() const
return all;
}
future<> sstable::snapshot(const sstring& dir) const {
return _storage->snapshot(*this, dir, storage::absolute_path::yes);
future<generation_type> sstable::snapshot(const sstring& dir, bool use_sstable_identifier) const {
// Use the sstable identifier UUID if available to enable global de-duplication of sstables in backup.
generation_type gen = (use_sstable_identifier && _sstable_identifier) ? generation_type(_sstable_identifier->uuid()) : _generation;
co_await _storage->snapshot(*this, dir, storage::absolute_path::yes, gen);
co_return gen;
}
future<> sstable::change_state(sstable_state to, delayed_commit_changes* delay_commit) {

View File

@@ -397,6 +397,10 @@ public:
return _version;
}
format_types get_format() const {
return _format;
}
// Returns the total bytes of all components.
uint64_t bytes_on_disk() const;
file_size_stats get_file_size_stats() const;
@@ -438,7 +442,10 @@ public:
std::vector<std::pair<component_type, sstring>> all_components() const;
future<> snapshot(const sstring& dir) const;
// When use_sstable_identifier is true and the sstable identifier is available,
// use it to name the sstable in the snapshot, rather than the sstable generation.
// Returns the generation used for snapshot.
future<generation_type> snapshot(const sstring& dir, bool use_sstable_identifier = false) const;
// Delete the sstable by unlinking all sstable files
// Ignores all errors.

View File

@@ -31,6 +31,7 @@
#include "replica/database.hh"
#include "utils/assert.hh"
#include "utils/lister.hh"
#include "utils/rjson.hh"
#include "partition_slice_builder.hh"
#include "mutation/frozen_mutation.hh"
#include "test/lib/mutation_source_test.hh"
@@ -38,6 +39,7 @@
#include "service/migration_manager.hh"
#include "sstables/sstables.hh"
#include "sstables/generation_type.hh"
#include "sstables/sstable_version.hh"
#include "db/config.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog.hh"
@@ -51,6 +53,7 @@
#include "db/system_keyspace.hh"
#include "db/view/view_builder.hh"
#include "replica/mutation_dump.hh"
#include "utils/error_injection.hh"
using namespace std::chrono_literals;
using namespace sstables;
@@ -612,13 +615,13 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
});
}
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", bool skip_flush = false) {
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) {
try {
auto uuid = e.db().local().find_uuid(ks_name, cf_name);
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, skip_flush);
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts);
} catch (...) {
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
ks_name, cf_name, snapshot_name, skip_flush, std::current_exception());
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={} use_sstable_identifier={}: {}",
ks_name, cf_name, snapshot_name, opts.skip_flush, opts.use_sstable_identifier, std::current_exception());
throw;
}
}
@@ -632,6 +635,37 @@ future<std::set<sstring>> collect_files(fs::path path) {
co_return ret;
}
static bool is_component(const sstring& fname, const sstring& suffix) {
return fname.ends_with(suffix);
}
static std::set<sstring> collect_sstables(const std::set<sstring>& all_files, const sstring& suffix) {
// Verify manifest against the files in the snapshots dir
auto pred = [&suffix] (const sstring& fname) {
return is_component(fname, suffix);
};
return std::ranges::filter_view(all_files, pred) | std::ranges::to<std::set<sstring>>();
}
// Validate that the manifest.json lists exactly the SSTables present in the snapshot directory
static future<> validate_manifest(const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir) {
sstring suffix = "-Data.db";
auto sstables_in_snapshot = collect_sstables(in_snapshot_dir, suffix);
std::set<sstring> sstables_in_manifest;
auto manifest_str = co_await util::read_entire_file_contiguous(snapshot_dir / "manifest.json");
auto manifest_json = rjson::parse(manifest_str);
auto& manifest_files = manifest_json["files"];
BOOST_REQUIRE(manifest_files.IsArray());
for (auto& f : manifest_files.GetArray()) {
if (is_component(f.GetString(), suffix)) {
sstables_in_manifest.insert(f.GetString());
}
}
testlog.debug("SSTables in manifest.json: {}", fmt::join(sstables_in_manifest, ", "));
BOOST_REQUIRE_EQUAL(sstables_in_snapshot, sstables_in_manifest);
}
static future<> snapshot_works(const std::string& table_name) {
return do_with_some_data({"cf"}, [table_name] (cql_test_env& e) {
take_snapshot(e, "ks", table_name).get();
@@ -651,6 +685,8 @@ static future<> snapshot_works(const std::string& table_name) {
// all files were copied and manifest was generated
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
validate_manifest(snapshot_dir, in_snapshot_dir).get();
return make_ready_future<>();
}, true);
}
@@ -669,7 +705,8 @@ SEASTAR_TEST_CASE(index_snapshot_works) {
SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
take_snapshot(e, "ks", "cf", "test", true /* skip_flush */).get();
db::snapshot_options opts = {.skip_flush = true};
take_snapshot(e, "ks", "cf", "test", opts).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
@@ -682,6 +719,41 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
});
}
SEASTAR_TEST_CASE(snapshot_use_sstable_identifier_works) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
return make_ready_future<>();
#endif
sstring table_name = "cf";
// Force random sstable identifiers, otherwise the initial sstable_id is equal
// to the sstable generation and the test can't distinguish between them.
utils::get_local_injector().enable("random_sstable_identifier", false);
return do_with_some_data({table_name}, [table_name] (cql_test_env& e) -> future<> {
sstring tag = "test";
db::snapshot_options opts = {.use_sstable_identifier = true};
co_await take_snapshot(e, "ks", table_name, tag, opts);
auto& cf = e.local_db().find_column_family("ks", table_name);
auto table_directory = table_dir(cf);
auto snapshot_dir = table_directory / sstables::snapshots_dir / tag;
auto in_table_dir = co_await collect_files(table_directory);
// snapshot triggered a flush and wrote the data down.
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
testlog.info("Files in table dir: {}", fmt::join(in_table_dir, ", "));
auto in_snapshot_dir = co_await collect_files(snapshot_dir);
testlog.info("Files in snapshot dir: {}", fmt::join(in_snapshot_dir, ", "));
in_table_dir.insert("manifest.json");
in_table_dir.insert("schema.cql");
// all files were copied and manifest was generated
BOOST_REQUIRE_EQUAL(in_table_dir.size(), in_snapshot_dir.size());
BOOST_REQUIRE_NE(in_table_dir, in_snapshot_dir);
co_await validate_manifest(snapshot_dir, in_snapshot_dir);
}, true);
}
SEASTAR_TEST_CASE(snapshot_list_okay) {
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
auto& cf = e.local_db().find_column_family("ks", "cf");
@@ -1456,7 +1528,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
}
BOOST_REQUIRE(found);
co_await take_snapshot(e, "ks", "cf", "test", true /* skip_flush */);
co_await take_snapshot(e, "ks", "cf", "test", db::snapshot_options{.skip_flush = true});
testlog.debug("Expected: {}", expected);

View File

@@ -346,4 +346,60 @@ SEASTAR_TEST_CASE(repair_rows_size_considers_external_memory) {
});
}
SEASTAR_TEST_CASE(test_tablet_token_range_count) {
{
// Simple case: one large range covers a smaller one
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r2 ranges overlap and should merge to cover r1
// r2: [0, 50] + [40, 100] -> merges to [0, 100]
// r1: [10, 90] should be covered
utils::chunked_vector<tablet_token_range> r1 = {{10, 90}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 50}, {40, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r2 ranges are adjacent (contiguous) and should merge
// r2: [0, 10] + [11, 20] -> merges to [0, 20]
// r1: [5, 15] should be covered
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}, {11, 20}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r1 overlaps r2 but is not FULLY contained
// r2: [0, 10]
// r1: [5, 15] (Ends too late), [ -5, 5 ] (Starts too early)
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}, {-5, 5}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 0);
}
{
// A single merged range in r2 covers multiple distinct ranges in r1
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}, {30, 40}, {50, 60}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 3);
}
{
// Inputs are provided in random order, ensuring the internal sort works
utils::chunked_vector<tablet_token_range> r1 = {{50, 60}, {10, 20}};
utils::chunked_vector<tablet_token_range> r2 = {{50, 100}, {0, 40}};
// r2 merges effectively to [0, 40] and [50, 100]
// Both r1 items are covered
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 2);
}
{
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
utils::chunked_vector<tablet_token_range> r2_empty = {};
utils::chunked_vector<tablet_token_range> r1_empty = {};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2_empty) == 0);
BOOST_REQUIRE(co_await count_finished_tablets(r1_empty, r2) == 0);
}
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -13,7 +13,8 @@ import ssl
import tempfile
import platform
import urllib.parse
from multiprocessing import Event, Process
from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test.pylib.runner import testpy_test_fixture_scope
@@ -186,15 +187,14 @@ async def manager_api_sock_path(request: pytest.FixtureRequest, testpy_test: Tes
await asyncio.get_running_loop().run_in_executor(None, stop_event.wait)
finally:
await mgr.stop()
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, run_manager())
start_event.wait()
manager_process = Process(target=lambda: asyncio.run(run_manager()))
manager_process.start()
start_event.wait()
yield sock_path
yield sock_path
stop_event.set()
manager_process.join()
stop_event.set()
future.result()
@pytest.fixture(scope=testpy_test_fixture_scope)

View File

@@ -16,16 +16,26 @@ from test.cluster.util import get_topology_coordinator, new_test_keyspace, recon
logger = logging.getLogger(__name__)
# This test makes sure that view building is done mainly in the streaming scheduling group
# and not the gossip scheduling group. We do that by measuring the time each group was
# busy during the view building process and confirming that the gossip group was busy
# much less than the streaming group.
# Reproduces https://github.com/scylladb/scylladb/issues/21232
# This test makes sure that view building is done mainly in the streaming
# scheduling group. We check that by grepping all relevant logs in TRACE mode
# and verifying that they come from the streaming scheduling group.
#
# For more context, see: https://github.com/scylladb/scylladb/issues/21232.
# This test reproduces the issue in non-tablet mode.
@pytest.mark.asyncio
@skip_mode('debug', 'the test needs to do some work which takes too much time in debug mode')
async def test_view_building_scheduling_group(manager: ManagerClient):
server = await manager.server_add()
# Note: The view building coordinator works in the gossiping scheduling group,
# and we intentionally omit it here.
# Note: We include "view" for keyspaces that don't use the view building coordinator
# and will follow the legacy path instead.
loggers = ["view_building_worker", "view_consumer", "view_update_generator", "view"]
# Flatten the list of lists.
cmdline = sum([["--logger-log-level", f"{logger}=trace"] for logger in loggers], [])
server = await manager.server_add(cmdline=cmdline)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (p int, c int, PRIMARY KEY (p, c))")
@@ -35,21 +45,30 @@ async def test_view_building_scheduling_group(manager: ManagerClient):
batch = "BEGIN UNLOGGED BATCH\n" + "\n".join(inserts) + "\nAPPLY BATCH\n"
await manager.cql.run_async(batch)
metrics_before = await manager.metrics.query(server.ip_addr)
ms_gossip_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
ms_streaming_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv AS SELECT p, c FROM {ks}.tab WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p)")
await wait_for_view(cql, 'mv', 1)
metrics_after = await manager.metrics.query(server.ip_addr)
ms_gossip_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
ms_streaming_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
ms_streaming = ms_streaming_after - ms_streaming_before
ms_statement = ms_gossip_after - ms_gossip_before
ratio = ms_statement / ms_streaming
print(f"ms_streaming: {ms_streaming}, ms_statement: {ms_statement}, ratio: {ratio}")
assert ratio < 0.1
logger_alternative = "|".join(loggers)
pattern = rf"\[shard [0-9]+:(.+)\] ({logger_alternative}) - "
results = await log.grep(pattern, from_mark=mark)
# Sanity check. If there are no logs, something's wrong.
assert len(results) > 0
# In case of non-tablet keyspaces, we won't use the view building coordinator.
# Instead, view updates will follow the legacy path. Along the way, we'll observe
# this message, which will be printed using another scheduling group, so let's
# filter it out.
predicate = lambda result: f"Building view {ks}.mv, starting at token" not in result[0]
results = list(filter(predicate, results))
# Take the first parenthesized match for each result, i.e. the scheduling group.
sched_groups = [matches[1] for _, matches in results]
assert all(sched_group == "strm" for sched_group in sched_groups)
# A sanity check test ensures that starting and shutting down Scylla when view building is
# disabled is conducted properly and we don't run into any issues.

View File

@@ -43,6 +43,86 @@ async def guarantee_repair_time_next_second():
# different than the previous one.
await asyncio.sleep(1)
async def do_test_tablet_repair_progress_split_merge(manager: ManagerClient, do_split=False, do_merge=False):
nr_tablets = 16
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=True, tablets=nr_tablets)
token = 'all'
logs = []
for s in servers:
logs.append(await manager.server_open_log(s.server_id))
# Skip repair for the listed tablet id
nr_tablets_skipped = 4
nr_tablets_repaired = nr_tablets - nr_tablets_skipped
await inject_error_on(manager, "tablet_repair_skip_sched", servers, params={'value':"0,1,5,8"})
# Request to repair all tablets
repair_res = await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
logging.info(f'{repair_res=}')
tablet_task_id = repair_res['tablet_task_id']
async def get_task_status(desc):
task_status = await manager.api.get_task_status(servers[0].ip_addr, tablet_task_id)
completed = int(task_status['progress_completed'])
total = int(task_status['progress_total'])
logging.info(f'{desc=} {completed=} {total=} {task_status=}')
return completed, total
async def wait_task_progress(wanted_complete, wanted_total):
while True:
completed, total = await get_task_status("wait_task_progress")
if completed == wanted_complete and total == wanted_total:
break
await asyncio.sleep(1)
async def get_task_status_and_check(desc):
completed, total = await get_task_status(desc)
assert completed == nr_tablets_repaired
assert total == nr_tablets
# 12 out of 16 tablets should finish
await wait_task_progress(nr_tablets_repaired, nr_tablets)
if do_split:
await get_task_status_and_check("before_split")
s1_mark = await logs[0].mark()
await inject_error_on(manager, "tablet_force_tablet_count_increase", servers)
await logs[0].wait_for('Detected tablet split for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_increase", servers)
await get_task_status_and_check("after_split")
if do_merge:
await get_task_status_and_check("before_merge")
s1_mark = await logs[0].mark()
await inject_error_on(manager, "tablet_force_tablet_count_decrease", servers)
await logs[0].wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
await get_task_status_and_check("after_merge")
# Wait for all repair to finish after all tablets can be scheduled to run repair
await inject_error_off(manager, "tablet_repair_skip_sched", servers)
await wait_task_progress(nr_tablets, nr_tablets)
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.asyncio
async def test_tablet_repair_progress(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_split=False, do_merge=False)
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.asyncio
async def test_tablet_repair_progress_split(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_split=True)
@pytest.mark.asyncio
@pytest.mark.skip(reason="https://github.com/scylladb/scylladb/issues/26844")
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_repair_progress_merge(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_merge=True)
@pytest.mark.asyncio
async def test_tablet_manual_repair(manager: ManagerClient):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)

View File

@@ -4,7 +4,8 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from typing import Any
from cassandra.query import ConsistencyLevel
from cassandra.query import ConsistencyLevel, SimpleStatement
from cassandra.policies import FallthroughRetryPolicy
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
from test.pylib.manager_client import ManagerClient
@@ -1596,7 +1597,7 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
async def truncate_table():
await asyncio.sleep(10)
logger.info("Executing truncate during bootstrap")
await cql.run_async(f"TRUNCATE {ks}.test USING TIMEOUT 1m")
await cql.run_async(SimpleStatement(f"TRUNCATE {ks}.test USING TIMEOUT 4m", retry_policy=FallthroughRetryPolicy()))
truncate_task = asyncio.create_task(truncate_table())
logger.info("Adding fourth node")

View File

@@ -115,7 +115,7 @@ def compact_keyspace(cql, ks, flush_memtables=True):
args.extend([ks, cf])
run_nodetool(cql, "compact", *args)
def take_snapshot(cql, table, tag, skip_flush):
def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
ks, cf = table.split('.')
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params={'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush})
@@ -123,6 +123,8 @@ def take_snapshot(cql, table, tag, skip_flush):
args = ['--tag', tag, '--table', cf]
if skip_flush:
args.append('--skip-flush')
if use_sstable_identifier:
args.append('--use-sstable-identifier')
args.append(ks)
run_nodetool(cql, "snapshot", *args)

View File

@@ -881,7 +881,7 @@ private:
_fd.start(
std::ref(_fd_pinger), std::ref(fd_clock),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count(),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{600}}.count()).get();
service::direct_fd_clock::base::duration{std::chrono::milliseconds{600}}.count(), gcfg.gossip_scheduling_group).get();
auto stop_fd = defer_verbose_shutdown("direct failure detector", [this] {
_fd.stop().get();

View File

@@ -99,7 +99,7 @@ def test_listsnapshots_no_snapshots(nodetool, request):
assert res.stdout == "Snapshot Details: \nThere are no snapshots\n"
def check_snapshot_out(res, tag, ktlist, skip_flush):
def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=False):
"""Check that the output of nodetool snapshot contains the expected messages"""
if len(ktlist) == 0:
@@ -110,7 +110,7 @@ def check_snapshot_out(res, tag, ktlist, skip_flush):
pattern = re.compile("Requested creating snapshot\\(s\\)"
f" for \\[{keyspaces}\\]"
f" with snapshot name \\[(.+)\\]"
f" and options \\{{skipFlush={str(skip_flush).lower()}\\}}")
f" and options \\{{skip_flush={str(skip_flush).lower()}, use_sstable_identifier={str(use_sstable_identifier).lower()}\\}}")
print(res)
print(pattern)
@@ -138,13 +138,13 @@ def test_snapshot_keyspace(nodetool):
res = nodetool("snapshot", "--tag", tag, "ks1", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
res = nodetool("snapshot", "--tag", tag, "ks1", "ks2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
])
check_snapshot_out(res.stdout, tag, ["ks1", "ks2"], False)
@@ -155,13 +155,13 @@ def test_snapshot_keyspace_with_table(nodetool, option_name):
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl1,tbl2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
@@ -186,7 +186,7 @@ class kn_param(NamedTuple):
def test_snapshot_keyspace_table_single_arg(nodetool, param, scylla_only):
tag = "my_snapshot"
req_params = {"tag": tag, "sf": "false", "kn": param.kn}
req_params = {"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": param.kn}
if param.cf:
req_params["cf"] = param.cf
@@ -202,19 +202,19 @@ def test_snapshot_ktlist(nodetool, option_name):
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1"})
])
check_snapshot_out(res.stdout, tag, ["ks1.tbl1"], False)
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1,ks2.tbl2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1.tbl1,ks2.tbl2"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1.tbl1,ks2.tbl2"})
])
check_snapshot_out(res.stdout, tag, ["ks1.tbl1", "ks2.tbl2"], False)
res = nodetool("snapshot", "--tag", tag, option_name, "ks1,ks2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
])
check_snapshot_out(res.stdout, tag, ["ks1" ,"ks2"], False)
@@ -229,7 +229,8 @@ def test_snapshot_ktlist(nodetool, option_name):
{"ks": ["ks1", "ks2"], "tbl": []},
])
@pytest.mark.parametrize("skip_flush", [False, True])
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
@pytest.mark.parametrize("use_sstable_identifier", [False, True])
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_identifier):
cmd = ["snapshot"]
params = {}
@@ -242,8 +243,11 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
if skip_flush:
cmd.append("--skip-flush")
if use_sstable_identifier:
cmd.append("--use-sstable-identifier")
params["sf"] = str(skip_flush).lower()
params["use_sstable_identifier"] = str(use_sstable_identifier).lower()
if ktlist:
if "tbl" in ktlist:
@@ -273,7 +277,7 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
expected_request("POST", "/storage_service/snapshots", params=params)
])
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush)
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush, use_sstable_identifier)
def test_snapshot_multiple_keyspace_with_table(nodetool):

View File

@@ -30,7 +30,7 @@ static const int cell_size = 128;
static bool cancelled = false;
template<typename MutationGenerator>
void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen, std::function<mutation()> before_flush = {}) {
tests::reader_concurrency_semaphore_wrapper semaphore;
cache_tracker tracker;
row_cache cache(s, make_empty_snapshot_source(), tracker, is_continuous::yes);
@@ -58,6 +58,10 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
return;
}
}
if (before_flush) {
mutation m = before_flush();
mt->apply(m);
}
});
memtable_slm.stop();
std::cout << format("Memtable fill took {:.6f} [ms], {}", fill_d.count() * 1000, memtable_slm) << std::endl;
@@ -181,6 +185,43 @@ static void test_partition_with_lots_of_small_rows() {
});
}
static void test_partition_with_lots_of_small_rows_covered_by_tombstone() {
auto s = schema_builder("ks", "cf")
.with_column("pk", uuid_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v1", bytes_type, column_kind::regular_column)
.with_column("v2", bytes_type, column_kind::regular_column)
.with_column("v3", bytes_type, column_kind::regular_column)
.build();
auto pk = dht::decorate_key(*s, partition_key::from_single_value(*s,
serialized(utils::UUID_gen::get_time_UUID())));
int ck_idx = 0;
int flush_ck_idx = 0;
run_test("Large partition, lots of small rows covered by single tombstone", s, [&] {
mutation m(s, pk);
auto val = data_value(bytes(bytes::initialized_later(), cell_size));
auto ck = clustering_key::from_single_value(*s, serialized(ck_idx++));
auto ts = api::new_timestamp();
m.set_clustered_cell(ck, "v1", val, ts);
m.set_clustered_cell(ck, "v2", val, ts);
m.set_clustered_cell(ck, "v3", val, ts);
return m;
}, [&] { // before_flush
// Delete key range [-inf, flush_ck_idx)
std::cout << "Generated " << (ck_idx - flush_ck_idx) << " rows\n";
auto m = mutation(s, pk);
auto ck = clustering_key::from_single_value(*s, serialized(flush_ck_idx));
m.partition().apply_row_tombstone(*s, range_tombstone(
position_in_partition_view::before_all_clustered_rows(),
position_in_partition_view::before_key(ck),
tombstone(api::new_timestamp(), gc_clock::now())));
flush_ck_idx = ck_idx;
return m;
});
}
static void test_partition_with_few_small_rows() {
auto s = schema_builder("ks", "cf")
.with_column("pk", uuid_type, column_kind::partition_key)
@@ -275,6 +316,7 @@ int scylla_row_cache_update_main(int argc, char** argv) {
cancelled = true;
});
logalloc::prime_segment_pool(memory::stats().total_memory(), memory::min_free_memory()).get();
test_partition_with_lots_of_small_rows_covered_by_tombstone();
test_small_partitions();
test_partition_with_few_small_rows();
test_partition_with_lots_of_small_rows();

View File

@@ -109,6 +109,7 @@ class ResourceGather(ABC):
except subprocess.TimeoutExpired:
logger.critical(f"Process {args} timed out")
p.kill()
p.communicate()
except KeyboardInterrupt:
p.kill()
raise

View File

@@ -31,7 +31,7 @@ struct test_pinger: public direct_failure_detector::pinger {
std::unordered_map<endpoint_id, size_t> _pings;
bool _block = false;
virtual future<bool> ping(endpoint_id ep, abort_source& as) override {
virtual future<bool> ping(endpoint_id ep, direct_failure_detector::clock::timepoint_t timeout, abort_source& as, direct_failure_detector::clock& c) override {
bool ret = false;
co_await invoke_abortable_on(0, [this, ep, &ret] (abort_source& as) -> future<> {
++_pings[ep];
@@ -91,6 +91,9 @@ struct test_clock : public direct_failure_detector::clock {
throw sleep_aborted{};
}
}
virtual std::chrono::milliseconds to_milliseconds(timepoint_t tp) const override {
throw std::logic_error("to_milliseconds is not implemented");
}
};
struct test_listener : public direct_failure_detector::listener {
@@ -129,7 +132,7 @@ SEASTAR_TEST_CASE(failure_detector_test) {
test_pinger pinger;
test_clock clock;
sharded<direct_failure_detector::failure_detector> fd;
co_await fd.start(std::ref(pinger), std::ref(clock), 10, 30);
co_await fd.start(std::ref(pinger), std::ref(clock), 10, 30, seastar::current_scheduling_group());
test_listener l1, l2;
auto sub1 = co_await fd.local().register_listener(l1, 95);

View File

@@ -1065,7 +1065,7 @@ public:
}
// Can be called on any shard.
future<bool> ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) override {
future<bool> ping(direct_failure_detector::pinger::endpoint_id id, direct_failure_detector::clock::timepoint_t timeout, abort_source& as, direct_failure_detector::clock& c) override {
try {
co_await invoke_abortable_on(0, [this, id] (abort_source& as) {
return _rpc.ping(raft::server_id{id}, as);
@@ -1127,6 +1127,10 @@ public:
throw sleep_aborted{};
}
}
virtual std::chrono::milliseconds to_milliseconds(timepoint_t tp) const override {
throw std::logic_error("to_milliseconds is not implemented");
}
};
class direct_fd_listener : public raft::failure_detector, public direct_failure_detector::listener {
@@ -1436,7 +1440,7 @@ public:
// _fd_service must be started before raft server,
// because as soon as raft server is started, it may start adding endpoints to the service.
// _fd_service is using _server's RPC, but not until the first endpoint is added.
co_await _fd_service->start(std::ref(*_fd_pinger), std::ref(*_fd_clock), fd_ping_period.count(), fd_ping_timeout.count());
co_await _fd_service->start(std::ref(*_fd_pinger), std::ref(*_fd_clock), fd_ping_period.count(), fd_ping_timeout.count(), seastar::current_scheduling_group());
_fd_subscription.emplace(co_await _fd_service->local().register_listener(*_fd_listener, _fd_convict_threshold.count()));
co_await _server->start();
}

View File

@@ -2362,16 +2362,23 @@ void snapshot_operation(scylla_rest_client& client, const bpo::variables_map& vm
params["sf"] = "false";
}
if (vm.contains("use-sstable-identifier")) {
params["use_sstable_identifier"] = "true";
} else {
params["use_sstable_identifier"] = "false";
}
client.post("/storage_service/snapshots", params);
if (kn_msg.empty()) {
kn_msg = params["kn"];
}
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skipFlush={}}}\n",
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skip_flush={}, use_sstable_identifier={}}}\n",
kn_msg,
params["tag"],
params["sf"]);
params["sf"],
params["use_sstable_identifier"]);
fmt::print(std::cout, "Snapshot directory: {}\n", params["tag"]);
}
@@ -4598,6 +4605,7 @@ For more information, see: {}
typed_option<sstring>("keyspace-table-list", "The keyspace.table pair(s) to snapshot, multiple ones can be joined with ','"),
typed_option<sstring>("tag,t", "The name of the snapshot"),
typed_option<>("skip-flush", "Do not flush memtables before snapshotting (snapshot will not contain unflushed data)"),
typed_option<>("use-sstable-identifier", "Use the sstable identifier UUID, if available, rather than the sstable generation for the sstable file names within the snapshot dir and the manifest file"),
},
{
typed_option<std::vector<sstring>>("keyspaces", "The keyspaces to snapshot", -1),

View File

@@ -0,0 +1,41 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <fmt/format.h>
#include <functional>
/// Type-erased formatter.
/// Allows passing formattable objects without exposing their types.
class abstract_formatter {
std::function<void(fmt::format_context&)> _formatter;
public:
abstract_formatter() = default;
template<typename Func>
requires std::is_invocable_v<Func, fmt::format_context&>
explicit abstract_formatter(Func&& f) : _formatter(std::forward<Func>(f)) {}
fmt::format_context::iterator format_to(fmt::format_context& ctx) const {
if (_formatter) {
_formatter(ctx);
}
return ctx.out();
}
explicit operator bool() const noexcept { return bool(_formatter); }
};
template <> struct fmt::formatter<abstract_formatter> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const abstract_formatter& formatter, fmt::format_context& ctx) const {
return formatter.format_to(ctx);
}
};

View File

@@ -461,6 +461,9 @@ public:
, _metrics(m)
, _lru(l)
, _region(reg)
, _as(abstract_formatter([this] (fmt::format_context& ctx) {
fmt::format_to(ctx.out(), "cached_file {}", _file_name);
}))
, _cache(page_idx_less_comparator())
, _size(size)
{

View File

@@ -2948,10 +2948,10 @@ void allocating_section::on_alloc_failure(logalloc::region& r) {
r.allocator().invalidate_references();
if (r.get_tracker().get_impl().segment_pool().allocation_failure_flag()) {
_lsa_reserve *= 2;
llogger.info("LSA allocation failure, increasing reserve in section {} to {} segments; trace: {}", fmt::ptr(this), _lsa_reserve, current_backtrace());
llogger.info("LSA allocation failure, increasing reserve in section {} ({}) to {} segments; trace: {}", fmt::ptr(this), _name, _lsa_reserve, current_backtrace());
} else {
_std_reserve *= 2;
llogger.info("Standard allocator failure, increasing head-room in section {} to {} [B]; trace: {}", fmt::ptr(this), _std_reserve, current_backtrace());
llogger.info("Standard allocator failure, increasing head-room in section {} ({}) to {} [B]; trace: {}", fmt::ptr(this), _name, _std_reserve, current_backtrace());
}
reserve(r.get_tracker().get_impl());
}

View File

@@ -17,6 +17,7 @@
#include "utils/assert.hh"
#include "utils/entangled.hh"
#include "utils/memory_limit_reached.hh"
#include "utils/abstract_formatter.hh"
namespace logalloc {
@@ -442,6 +443,7 @@ class allocating_section {
size_t _minimum_lsa_emergency_reserve = 0;
int64_t _remaining_std_bytes_until_decay = s_bytes_per_decay;
int _remaining_lsa_segments_until_decay = s_segments_per_decay;
abstract_formatter _name;
private:
struct guard {
tracker::impl& _tracker;
@@ -453,6 +455,8 @@ private:
void maybe_decay_reserve() noexcept;
void on_alloc_failure(logalloc::region&);
public:
allocating_section() = default;
explicit allocating_section(abstract_formatter name) : _name(std::move(name)) {}
void set_lsa_reserve(size_t) noexcept;
void set_std_reserve(size_t) noexcept;