Compare commits

...

33 Commits

Author SHA1 Message Date
Avi Kivity
f3297824e3 Revert "config: decrease default large allocation warning threshold to 128k"
This reverts commit 04fb2c026d. 2025.3 got
the reduced threshold, but won't get many of the fixes the warning will
generate, leaving it very noisy. Better to avoid the noise for this release.

Fixes #24384.
2025-07-10 14:12:14 +03:00
Avi Kivity
4eb220d3ab service: tablet_allocator: avoid large contiguous vector in make_repair_plan()
make_repair_plan() allocates a temporary vector which can grow larger
than our 128k basic allocation unit. Use a chunked vector to avoid
stalls due to large allocations.

Fixes #24713.

Closes scylladb/scylladb#24801

(cherry picked from commit 0138afa63b)

Closes scylladb/scylladb#24902
2025-07-10 12:41:35 +03:00
Patryk Jędrzejczak
c9de7d68f2 Merge '[Backport 2025.3] Make it easier to debug stuck raft topology operation.' from Scylladb[bot]
The series adds more logging and provides new REST api around topology command rpc execution to allow easier debugging of stuck topology operations.

Backport since we want to have in the production as quick as possible.

Fixes #24860

- (cherry picked from commit c8ce9d1c60)

- (cherry picked from commit 4e6369f35b)

Parent PR: #24799

Closes scylladb/scylladb#24881

* https://github.com/scylladb/scylladb:
  topology coordinator: log a start and an end of topology coordinator command execution at info level
  topology coordinator: add REST endpoint to query the status of ongoing topology cmd rpc
2025-07-09 12:55:48 +02:00
Piotr Dulikowski
b535f44db2 Merge '[Backport 2025.3] batchlog_manager: abort replay of a failed batch on shutdown or node down' from Scylladb[bot]
When replaying a failed batch and sending the mutation to all replicas, make the write response handler cancellable and abort it on shutdown or if some target is marked down. also set a reasonable timeout so it gets aborted if it's stuck for some other unexpected reason.

Previously, the write response handler is not cancellable and has no timeout. This can cause a scenario where some write operation by the batchlog manager is stuck indefinitely, and node shutdown gets stuck as well because it waits for the batchlog manager to complete, without aborting the operation.

backport to relevant versions since the issue can cause node shutdown to hang

Fixes scylladb/scylladb#24599

- (cherry picked from commit 8d48b27062)

- (cherry picked from commit fc5ba4a1ea)

- (cherry picked from commit 7150632cf2)

- (cherry picked from commit 74a3fa9671)

- (cherry picked from commit a9b476e057)

- (cherry picked from commit d7af26a437)

Parent PR: #24595

Closes scylladb/scylladb#24882

* github.com:scylladb/scylladb:
  test: test_batchlog_manager: batchlog replay includes cdc
  test: test_batchlog_manager: test batch replay when a node is down
  batchlog_manager: set timeout on writes
  batchlog_manager: abort writes on shutdown
  batchlog_manager: create cancellable write response handler
  storage_proxy: add write type parameter to mutate_internal
2025-07-08 12:35:55 +02:00
Michael Litvak
ec1dd1bf31 test: test_batchlog_manager: batchlog replay includes cdc
Add a new test that verifies that when replaying batch mutations from
the batchlog, the mutations include cdc augmentation if needed.

This is done in order to verify that it works currently as expected and
doesn't break in the future.

(cherry picked from commit d7af26a437)
2025-07-08 06:25:36 +00:00
Michael Litvak
7b30f487dd test: test_batchlog_manager: test batch replay when a node is down
Add a test of the batchlog manager replay loop applying failed batches
while some replica is down.

The test reproduces an issue where the batchlog manager tries to replay
a failed batch, doesn't get a response from some replica, and becomes
stuck.

It verifies that the batchlog manager can eventually recover from this
situation and continue applying failed batches.

(cherry picked from commit a9b476e057)
2025-07-08 06:25:36 +00:00
Michael Litvak
c3c489d3d4 batchlog_manager: set timeout on writes
Set a timeout on writes of replayed batches by the batchlog manager.

We want to avoid having infinite timeout for the writes in case it gets
stuck for some unexpected reason.

The timeout is set to be high enough to allow any reasonable write to
complete.

(cherry picked from commit 74a3fa9671)
2025-07-08 06:25:36 +00:00
Michael Litvak
6fb6bb8dc7 batchlog_manager: abort writes on shutdown
On shutdown of batchlog manager, abort all writes of replayed batches
by the batchlog manager.

To achieve this we set the appropriate write_type to BATCH, and on
shutdown cancel all write handlers with this type.

(cherry picked from commit 7150632cf2)
2025-07-08 06:25:36 +00:00
Michael Litvak
02c038efa8 batchlog_manager: create cancellable write response handler
When replaying a batch mutation from the batchlog manager and sending it
to all replicas, create the write response handler as cancellable.

To achieve this we define a new wrapper type for batchlog mutations -
batchlog_replay_mutation, and this allows us to overload
create_write_response_handler for this type. This is similar to how it's
done with hint_wrapper and read_repair_mutation.

(cherry picked from commit fc5ba4a1ea)
2025-07-08 06:25:36 +00:00
Michael Litvak
d3175671b7 storage_proxy: add write type parameter to mutate_internal
Currently mutate_internal has a boolean parameter `counter_write` that
indicates whether the write is of counter type or not.

We replace it with a more general parameter that allows to indicate the
write type.

It is compatible with the previous behavior - for a counter write, the
type COUNTER is passed, and otherwise a default value will be used
as before.

(cherry picked from commit 8d48b27062)
2025-07-08 06:25:36 +00:00
Gleb Natapov
4651c44747 topology coordinator: log a start and an end of topology coordinator command execution at info level
Those calls a relatively rare and the output may help to analyze issues
in production.

(cherry picked from commit 4e6369f35b)
2025-07-08 06:24:22 +00:00
Gleb Natapov
0e67f6f6c2 topology coordinator: add REST endpoint to query the status of ongoing topology cmd rpc
The topology coordinator executes several topology cmd rpc against some nodes
during a topology change. A topology operation will not proceed unless
rpc completes (successfully or not), but sometimes it appears that it
hangs and it is hard to tell on which nodes it did not complete yet.
Introduce new REST endpoint that can help with debugging such cases.
If executed on the topology coordinator it returns currently running
topology rpc (if any) and a list of nodes that did not reply yet.

(cherry picked from commit c8ce9d1c60)
2025-07-08 06:24:21 +00:00
Avi Kivity
859d9dd3b1 Merge '[Backport 2025.3] Improve background disposal of tablet_metadata' from Scylladb[bot]
As seen in #23284, when the tablet_metadata contains many tables, even empty ones,
we're seeing a long queue of seastar tasks coming from the individual destruction of
`tablet_map_ptr = foreign_ptr<lw_shared_ptr<const tablet_map>>`.

This change improves `tablet_metadata::clear_gently` to destroy the `tablet_map_ptr` objects
on their owner shard by sorting them into vectors, per- owner shard.

Also, background call to clear_gently was added to `~token_metadata`, as it is destroyed
arbitrarily when automatic token_metadata_ptr variables go out of scope, so that the
contained tablet_metadata would be cleared gently.

Finally, a unit test was added to reproduce the `Too long queue accumulated for gossip` symptom
and verify that it is gone with this change.

Fixes #24814
Refs #23284

This change is not marked as fixing the issue since we still need to verify that there is no impact on query performance, reactor stalls, or large allocations, with a large number of tablet-based tables.

* Since the issue exists in 2025.1, requesting backport to 2025.1 and upwards

- (cherry picked from commit 3acca0aa63)

- (cherry picked from commit 493a2303da)

- (cherry picked from commit e0a19b981a)

- (cherry picked from commit 2b2cfaba6e)

- (cherry picked from commit 2c0bafb934)

- (cherry picked from commit 4a3d14a031)

- (cherry picked from commit 6e4803a750)

Parent PR: #24618

Closes scylladb/scylladb#24864

* github.com:scylladb/scylladb:
  token_metadata_impl: clear_gently: release version tracker early
  test: cluster: test_tablets_merge: add test_tablet_split_merge_with_many_tables
  token_metadata: clear_and_destroy_impl when destroyed
  token_metadata: keep a reference to shared_token_metadata
  token_metadata: move make_token_metadata_ptr into shared_token_metadata class
  replica: database: get and expose a mutable locator::shared_token_metadata
  locator: tablets: tablet_metadata: clear_gently: optimize foreign ptr destruction
2025-07-07 14:02:19 +03:00
Gleb Natapov
a25bd068bf topology coordinator: do not set request_type field for truncation command if topology_global_request_queue feature is not enabled yet
Old nodes do not expect global topology request names to be in
request_type field, so set it only if a cluster is fully upgraded
already.

Closes scylladb/scylladb#24731

(cherry picked from commit ca7837550d)

Closes scylladb/scylladb#24833
2025-07-07 11:50:55 +02:00
Benny Halevy
9bc487e79e token_metadata_impl: clear_gently: release version tracker early
No need to wait for all members to be cleared gently.
We can release the version earlier since the
held version may be awaited for in barriers.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 6e4803a750)
2025-07-07 09:42:29 +03:00
Benny Halevy
41dc86ffa8 test: cluster: test_tablets_merge: add test_tablet_split_merge_with_many_tables
Reproduces #23284

Currently skipped in release mode since it requires
the `short_tablet_stats_refresh_interval` interval.
Ref #24641

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 4a3d14a031)
2025-07-07 09:42:26 +03:00
Benny Halevy
f78a352a29 token_metadata: clear_and_destroy_impl when destroyed
We have a lot of places in the code where
a token_metadata_ptr is kept in an automatic
variable and destroyed when it leaves the scope.
since it's a referenced counted lw_shared_ptr,
the token_metadata object is rarely destroyed in
those cases, but when it is, it doesn't go through
clear_gently, and in particular its tablet_metadata
is not cleared gently, leading to inefficient destruction
of potentially many foreign_ptr:s.

This patch calls clear_and_destroy_impl that gently
clears and destroys the impl object in the background
using the shared_token_metadata.

Fixes #13381

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 2c0bafb934)
2025-07-07 09:38:17 +03:00
Benny Halevy
b647dbd547 token_metadata: keep a reference to shared_token_metadata
To be used by a following patch to gently clean and destroy
the token_data_impl in the background.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 2b2cfaba6e)
2025-07-07 09:34:10 +03:00
Benny Halevy
0e7d3b4eb9 token_metadata: move make_token_metadata_ptr into shared_token_metadata class
So we can use the local shared_token_metadata instance
for safe background destroy of token_metadata_impl:s.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit e0a19b981a)
2025-07-07 09:30:01 +03:00
Benny Halevy
c8043e05c1 replica: database: get and expose a mutable locator::shared_token_metadata
Prepare for next patch, the will use this shared_token_metadata
to make mutable_token_metadata_ptr:s

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 493a2303da)
2025-07-07 09:27:06 +03:00
Benny Halevy
54fb9ed03b locator: tablets: tablet_metadata: clear_gently: optimize foreign ptr destruction
Sort all tablet_map_ptr:s by shard_id
and then destroy them on each shard to prevent
long cross-shard task queues for foreign_ptr destructions.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 3acca0aa63)
2025-07-07 09:27:01 +03:00
Avi Kivity
f60c54df77 storage_proxy: avoid large allocation when storing batch in system.batchlog
Currently, when computing the mutation to be stored in system.batchlog,
we go through data_value. In turn this goes through `bytes` type
(#24810), so it causes a large contiguous allocation if the batch is
large.

Fix by going through the more primitive, but less contiguous,
atomic_cell API.

Fixes #24809.

Closes scylladb/scylladb#24811

(cherry picked from commit 60f407bff4)

Closes scylladb/scylladb#24846
2025-07-05 00:37:09 +03:00
Patryk Jędrzejczak
f1ec51133e docs: handling-node-failures: fix typo
Replacing "from" is incorrect. The typo comes from recently
merged #24583.

Fixes #24732

Requires backport to 2025.2 since #24583 has been backported to 2025.2.

Closes scylladb/scylladb#24733

(cherry picked from commit fa982f5579)

Closes scylladb/scylladb#24832
2025-07-04 19:35:00 +02:00
Jenkins Promoter
648fe6a4e8 Update ScyllaDB version to: 2025.3.0-rc1 2025-07-03 11:35:01 +03:00
Michał Chojnowski
1bd536a228 utils/alien_worker: fix a data race in submit()
We move a `seastar::promise` on the external worker thread,
after the matching `seastar::future` was returned to the shard.

That's illegal. If the `promise` move occurs concurrently with some
operation (move, await) on the `future`, it becomes a data race
which could cause various kinds of corruption.

This patch fixes that by keeping the promise at a stable address
on the shard (inside a coroutine frame) and only passing through
the worker.

Fixes #24751

Closes scylladb/scylladb#24752

(cherry picked from commit a29724479a)

Closes scylladb/scylladb#24780
2025-07-03 10:45:51 +03:00
Avi Kivity
d5b11098e8 repair: row_level: unstall to_repair_rows_on_wire() destroying its input
to_repair_rows_on_wire() moves the contents of its input std::list
and is careful to yield after each element, but the final destruction
of the input list still deals with all of the list elements without
yielding. This is expensive as not all contents of repair_row are moved
(_dk_with_hash is of type lw_shared_ptr<const decorated_key_with_hash>).

To fix, destroy each row element as we move along. This is safe as we
own the input and don't reference row_list other than for the iteration.

Fixes #24725.

Closes scylladb/scylladb#24726

(cherry picked from commit 6aa71205d8)

Closes scylladb/scylladb#24771
2025-07-03 10:44:58 +03:00
Tomasz Grabiec
775916132e Merge '[Backport 2025.3] repair: postpone repair until topology is not busy ' from Scylladb[bot]
Currently, repair_service::repair_tablets starts repair if there
is no ongoing tablet operations. The check does not consider global
topology operations, like tablet resize finalization.

Hence, if:
- topology is in the tablet_resize_finalization state;
- repair starts (as there is no tablet transitions) and holds the erm;
- resize finalization finishes;

then the repair sees a topology state different than the actual -
it does not see that the storage groups were already split.
Repair code does not handle this case and it results with
on_internal_error.

Start repair when topology is not busy. The check isn't atomic,
as it's done on a shard 0. Thus, we compare the topology versions
to ensure that the business check is valid.

Fixes: https://github.com/scylladb/scylladb/issues/24195.

Needs backport to all branches since they are affected

- (cherry picked from commit df152d9824)

- (cherry picked from commit 83c9af9670)

Parent PR: #24202

Closes scylladb/scylladb#24783

* github.com:scylladb/scylladb:
  test: add test for repair and resize finalization
  repair: postpone repair until topology is not busy
2025-07-02 13:17:08 +02:00
Jenkins Promoter
76bf279e0e Update pgo profiles - aarch64 2025-07-02 13:06:18 +03:00
Jenkins Promoter
61364624e3 Update pgo profiles - x86_64 2025-07-02 12:34:58 +03:00
Botond Dénes
6e6c00dcfe docs: cql/types.rst: remove reference to frozen-only UDTs
ScyllaDB supports non-frozen UDTs since 3.2, no need to keep referencing
this limitation in the current docs. Replace the description of the
limitation with general description of frozen semantics for UDTs.

Fixes: #22929

Closes scylladb/scylladb#24763

(cherry picked from commit 37ef9efb4e)

Closes scylladb/scylladb#24784
2025-07-02 12:11:25 +03:00
Aleksandra Martyniuk
c26eb8ef14 test: add test for repair and resize finalization
Add test that checks whether repair does not start if there is an
ongoing resize finalization.

(cherry picked from commit 83c9af9670)
2025-07-01 20:26:53 +00:00
Aleksandra Martyniuk
8a1d09862e repair: postpone repair until topology is not busy
Currently, repair_service::repair_tablets starts repair if there
is no ongoing tablet operations. The check does not consider global
topology operations, like tablet resize finalization. This may cause
a data race and unexpected behavior.

Start repair when topology is not busy.

(cherry picked from commit df152d9824)
2025-07-01 20:26:53 +00:00
Yaron Kaikov
e64bb3819c Update ScyllaDB version to: 2025.3.0-rc0 2025-07-01 10:34:39 +03:00
35 changed files with 664 additions and 130 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2025.3.0-dev
VERSION=2025.3.0-rc1
if test -f version
then

View File

@@ -3161,6 +3161,22 @@
]
}
]
},
{
"path":"/storage_service/raft_topology/cmd_rpc_status",
"operations":[
{
"method":"GET",
"summary":"Get information about currently running topology cmd rpc",
"type":"string",
"nickname":"raft_topology_get_cmd_status",
"produces":[
"application/json"
],
"parameters":[
]
}
]
}
],
"models":{

View File

@@ -1670,6 +1670,18 @@ rest_raft_topology_upgrade_status(sharded<service::storage_service>& ss, std::un
co_return sstring(format("{}", ustate));
}
static
future<json::json_return_type>
rest_raft_topology_get_cmd_status(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
const auto status = co_await ss.invoke_on(0, [] (auto& ss) {
return ss.get_topology_cmd_status();
});
if (status.active_dst.empty()) {
co_return sstring("none");
}
co_return sstring(fmt::format("{}[{}]: {}", status.current, status.index, fmt::join(status.active_dst, ",")));
}
static
future<json::json_return_type>
rest_move_tablet(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
@@ -1902,6 +1914,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::reload_raft_topology_state.set(r, rest_bind(rest_reload_raft_topology_state, ss, group0_client));
ss::upgrade_to_raft_topology.set(r, rest_bind(rest_upgrade_to_raft_topology, ss));
ss::raft_topology_upgrade_status.set(r, rest_bind(rest_raft_topology_upgrade_status, ss));
ss::raft_topology_get_cmd_status.set(r, rest_bind(rest_raft_topology_get_cmd_status, ss));
ss::move_tablet.set(r, rest_bind(rest_move_tablet, ctx, ss));
ss::add_tablet_replica.set(r, rest_bind(rest_add_tablet_replica, ctx, ss));
ss::del_tablet_replica.set(r, rest_bind(rest_del_tablet_replica, ctx, ss));

View File

@@ -245,12 +245,18 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
qp.db().real_database().validate_keyspace_update(*ks_md_update);
service::topology_mutation_builder builder(ts);
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
rtbuilder.set("done", false)
.set("start_time", db_clock::now());
if (!qp.proxy().features().topology_global_request_queue) {
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_global_topology_request_id(global_request_id);
builder.set_new_keyspace_rf_change_data(_name, ks_options);
} else {
builder.queue_global_topology_request_id(global_request_id);
rtbuilder.set("request_type", service::global_topology_request::keyspace_rf_change)
.set_new_keyspace_rf_change_data(_name, ks_options);
};
service::topology_change change{{builder.build()}};
@@ -259,13 +265,6 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
return cm.to_mutation(topo_schema);
});
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", service::global_topology_request::keyspace_rf_change);
if (qp.proxy().features().topology_global_request_queue) {
rtbuilder.set_new_keyspace_rf_change_data(_name, ks_options);
}
service::topology_change req_change{{rtbuilder.build()}};
auto topo_req_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);

View File

@@ -36,7 +36,7 @@
static logging::logger blogger("batchlog_manager");
const uint32_t db::batchlog_manager::replay_interval;
const std::chrono::seconds db::batchlog_manager::replay_interval;
const uint32_t db::batchlog_manager::page_size;
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, batchlog_manager_config config)
@@ -116,7 +116,8 @@ future<> db::batchlog_manager::batchlog_replay_loop() {
} catch (...) {
blogger.error("Exception in batch replay: {}", std::current_exception());
}
delay = std::chrono::milliseconds(replay_interval);
delay = utils::get_local_injector().is_enabled("short_batchlog_manager_replay_interval") ?
std::chrono::seconds(1) : replay_interval;
}
}
@@ -132,6 +133,8 @@ future<> db::batchlog_manager::drain() {
_sem.broken();
}
co_await _qp.proxy().abort_batch_writes();
co_await std::move(_loop_done);
blogger.info("Drained");
}
@@ -173,6 +176,11 @@ future<> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cle
return make_ready_future<stop_iteration>(stop_iteration::no);
}
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
return make_ready_future<stop_iteration>(stop_iteration::no);
}
// check version of serialization format
if (!row.has("version")) {
blogger.warn("Skipping logged batch because of unknown version");
@@ -242,7 +250,8 @@ future<> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cle
// send to partially or wholly fail in actually sending stuff. Since we don't
// have hints (yet), send with CL=ALL, and hope we can re-do this soon.
// See below, we use retry on write failure.
return _qp.proxy().mutate(mutations, db::consistency_level::ALL, db::no_timeout, nullptr, empty_service_permit(), db::allow_per_partition_rate_limit::no);
auto timeout = db::timeout_clock::now() + write_timeout;
return _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
});
}).then_wrapped([this, id](future<> batch_result) {
try {

View File

@@ -43,8 +43,9 @@ public:
using post_replay_cleanup = bool_class<class post_replay_cleanup_tag>;
private:
static constexpr uint32_t replay_interval = 60 * 1000; // milliseconds
static constexpr std::chrono::seconds replay_interval = std::chrono::seconds(60);
static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
static constexpr std::chrono::seconds write_timeout = std::chrono::seconds(300);
using clock_type = lowres_clock;

View File

@@ -1230,7 +1230,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, sstable_summary_ratio(this, "sstable_summary_ratio", value_status::Used, 0.0005, "Enforces that 1 byte of summary is written for every N (2000 by default)"
"bytes written to data file. Value must be between 0 and 1.")
, components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .2, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.")
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, (size_t(128) << 10) + 1, "Warn about memory allocations above this size; set to zero to disable.")
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, size_t(1) << 20, "Warn about memory allocations above this size; set to zero to disable.")
, enable_deprecated_partitioners(this, "enable_deprecated_partitioners", value_status::Used, false, "Enable the byteordered and random partitioners. These partitioners are deprecated and will be removed in a future version.")
, enable_keyspace_column_family_metrics(this, "enable_keyspace_column_family_metrics", value_status::Used, false, "Enable per keyspace and per column family metrics reporting.")
, enable_node_aggregated_table_metrics(this, "enable_node_aggregated_table_metrics", value_status::Used, true, "Enable aggregated per node, per keyspace and per table metrics reporting, applicable if enable_keyspace_column_family_metrics is false.")

View File

@@ -481,7 +481,8 @@ Creating a new user-defined type is done using a ``CREATE TYPE`` statement defin
field_definition: `identifier` `cql_type`
A UDT has a name (``udt_name``), which is used to declare columns of that type and is a set of named and typed fields. The ``udt_name`` can be any
type, including collections or other UDTs. UDTs and collections inside collections must always be frozen (no matter which version of ScyllaDB you are using).
type, including collections or other UDTs.
Similar to collections, a UDT can be frozen or non-frozen. A frozen UDT is immutable and can only be updated as a whole. Nested UDTs or UDTs used in keys must always be frozen.
For example::
@@ -506,26 +507,15 @@ For example::
CREATE TABLE superheroes (
name frozen<full_name> PRIMARY KEY,
home frozen<address>
home address
);
.. note::
- Attempting to create an already existing type will result in an error unless the ``IF NOT EXISTS`` option is used. If it is used, the statement will be a no-op if the type already exists.
- A type is intrinsically bound to the keyspace in which it is created and can only be used in that keyspace. At creation, if the type name is prefixed by a keyspace name, it is created in that keyspace. Otherwise, it is created in the current keyspace.
- As of ScyllaDB Open Source 3.2, UDTs not inside collections do not have to be frozen, but in all versions prior to ScyllaDB Open Source 3.2, and in all ScyllaDB Enterprise versions, UDTs **must** be frozen.
A non-frozen UDT example with ScyllaDB Open Source 3.2 and higher::
CREATE TYPE ut (a int, b int);
CREATE TABLE cf (a int primary key, b ut);
Same UDT in versions prior::
CREATE TYPE ut (a int, b int);
CREATE TABLE cf (a int primary key, b frozen<ut>);
UDT literals
~~~~~~~~~~~~

View File

@@ -157,7 +157,7 @@ will leave the recovery mode and remove the obsolete internal Raft data.
After completing this step, Raft should be fully functional.
#. Replace all dead nodes from the cluster using the
#. Replace all dead nodes in the cluster using the
:doc:`node replacement procedure </operating-scylla/procedures/cluster-management/replace-dead-node/>`.
.. note::

View File

@@ -335,18 +335,25 @@ void tablet_metadata::drop_tablet_map(table_id id) {
}
future<> tablet_metadata::clear_gently() {
for (auto&& [id, map] : _tablets) {
const auto shard = map.get_owner_shard();
co_await smp::submit_to(shard, [map = std::move(map)] () mutable {
auto map_ptr = map.release();
// Others copies exist, we simply drop ours, no need to clear anything.
if (map_ptr.use_count() > 1) {
return make_ready_future<>();
}
return const_cast<tablet_map&>(*map_ptr).clear_gently().finally([map_ptr = std::move(map_ptr)] { });
});
tablet_logger.debug("tablet_metadata::clear_gently {}", fmt::ptr(this));
// First, Sort the tablet maps per shard to avoid destruction of all foreign tablet map ptrs
// on this shard. We don't use sharded<> here since it will require a similar
// submit_to to each shard owner per tablet-map.
std::vector<std::vector<tablet_map_ptr>> tablet_maps_per_shard;
tablet_maps_per_shard.resize(smp::count);
for (auto& [_, map_ptr] : _tablets) {
tablet_maps_per_shard[map_ptr.get_owner_shard()].emplace_back(std::move(map_ptr));
}
_tablets.clear();
// Now destroy the foreign tablet map pointers on each shard.
co_await smp::invoke_on_all([&] -> future<> {
for (auto& map_ptr : tablet_maps_per_shard[this_shard_id()]) {
auto map = map_ptr.release();
co_await utils::clear_gently(map);
}
});
co_return;
}

View File

@@ -357,6 +357,7 @@ future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_only_tok
}
future<> token_metadata_impl::clear_gently() noexcept {
_version_tracker = {};
co_await utils::clear_gently(_token_to_endpoint_map);
co_await utils::clear_gently(_normal_token_owners);
co_await utils::clear_gently(_bootstrap_tokens);
@@ -834,16 +835,30 @@ token_metadata::token_metadata(std::unique_ptr<token_metadata_impl> impl)
{
}
token_metadata::token_metadata(config cfg)
: _impl(std::make_unique<token_metadata_impl>(cfg))
token_metadata::token_metadata(shared_token_metadata& stm, config cfg)
: _shared_token_metadata(&stm)
, _impl(std::make_unique<token_metadata_impl>(std::move(cfg)))
{
}
token_metadata::~token_metadata() = default;
token_metadata::~token_metadata() {
clear_and_dispose_impl();
}
token_metadata::token_metadata(token_metadata&&) noexcept = default;
token_metadata& token_metadata::token_metadata::operator=(token_metadata&&) noexcept = default;
token_metadata& token_metadata::token_metadata::operator=(token_metadata&& o) noexcept {
if (this != &o) {
clear_and_dispose_impl();
_shared_token_metadata = std::exchange(o._shared_token_metadata, nullptr);
_impl = std::exchange(o._impl, nullptr);
}
return *this;
}
void token_metadata::set_shared_token_metadata(shared_token_metadata& stm) {
_shared_token_metadata = &stm;
}
const std::vector<token>&
token_metadata::sorted_tokens() const {
@@ -1027,6 +1042,15 @@ token_metadata::clone_after_all_left() const noexcept {
co_return token_metadata(co_await _impl->clone_after_all_left());
}
void token_metadata::clear_and_dispose_impl() noexcept {
if (!_shared_token_metadata) {
return;
}
if (auto impl = std::exchange(_impl, nullptr)) {
_shared_token_metadata->clear_and_dispose(std::move(impl));
}
}
future<> token_metadata::clear_gently() noexcept {
return _impl->clear_gently();
}
@@ -1143,6 +1167,17 @@ version_tracker shared_token_metadata::new_tracker(token_metadata::version_t ver
return tracker;
}
future<> shared_token_metadata::stop() noexcept {
co_await _background_dispose_gate.close();
}
void shared_token_metadata::clear_and_dispose(std::unique_ptr<token_metadata_impl> impl) noexcept {
// Safe to drop the future since the gate is closed in stop()
if (auto gh = _background_dispose_gate.try_hold()) {
(void)impl->clear_gently().finally([i = std::move(impl), gh = std::move(gh)] {});
}
}
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
if (_shared->get_ring_version() >= tmptr->get_ring_version()) {
on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version()));
@@ -1154,6 +1189,7 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
_stale_versions_in_use = _versions_barrier.advance_and_await();
}
tmptr->set_shared_token_metadata(*this);
_shared = std::move(tmptr);
_shared->set_version_tracker(new_tracker(_shared->get_version()));
@@ -1216,7 +1252,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded<shared_token_metada
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
pending_token_metadata_ptr.resize(smp::count);
auto tmptr = make_token_metadata_ptr(co_await stm.local().get()->clone_async());
auto tmptr = stm.local().make_token_metadata_ptr(co_await stm.local().get()->clone_async());
auto& tm = *tmptr;
// bump the token_metadata ring_version
// to invalidate cached token/replication mappings
@@ -1227,7 +1263,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded<shared_token_metada
// Apply the mutated token_metadata only after successfully cloning it on all shards.
pending_token_metadata_ptr[base_shard] = tmptr;
co_await smp::invoke_on_others(base_shard, [&] () -> future<> {
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tm.clone_async());
pending_token_metadata_ptr[this_shard_id()] = stm.local().make_token_metadata_ptr(co_await tm.clone_async());
});
co_await stm.invoke_on_all([&] (shared_token_metadata& stm) {

View File

@@ -47,7 +47,7 @@ class abstract_replication_strategy;
using token = dht::token;
class token_metadata;
class shared_token_metadata;
class tablet_metadata;
struct host_id_or_endpoint {
@@ -166,6 +166,7 @@ private:
};
class token_metadata final {
shared_token_metadata* _shared_token_metadata = nullptr;
std::unique_ptr<token_metadata_impl> _impl;
private:
friend class token_metadata_ring_splitter;
@@ -178,7 +179,7 @@ public:
using version_t = service::topology::version_t;
using version_tracker_t = version_tracker;
token_metadata(config cfg);
token_metadata(shared_token_metadata& stm, config cfg);
explicit token_metadata(std::unique_ptr<token_metadata_impl> impl);
token_metadata(token_metadata&&) noexcept; // Can't use "= default;" - hits some static_assert in unique_ptr
token_metadata& operator=(token_metadata&&) noexcept;
@@ -355,6 +356,11 @@ public:
friend class shared_token_metadata;
private:
void set_version_tracker(version_tracker_t tracker);
void set_shared_token_metadata(shared_token_metadata& stm);
// Clears and disposes the token metadata impl in the background, if present.
void clear_and_dispose_impl() noexcept;
};
struct topology_change_info {
@@ -371,12 +377,8 @@ struct topology_change_info {
using token_metadata_lock = semaphore_units<>;
using token_metadata_lock_func = noncopyable_function<future<token_metadata_lock>() noexcept>;
template <typename... Args>
mutable_token_metadata_ptr make_token_metadata_ptr(Args... args) {
return make_lw_shared<token_metadata>(std::forward<Args>(args)...);
}
class shared_token_metadata {
class shared_token_metadata : public peering_sharded_service<shared_token_metadata> {
named_gate _background_dispose_gate{"shared_token_metadata::background_dispose_gate"};
mutable_token_metadata_ptr _shared;
token_metadata_lock_func _lock_func;
std::chrono::steady_clock::duration _stall_detector_threshold = std::chrono::seconds(2);
@@ -408,7 +410,7 @@ public:
// used to construct the shared object as a sharded<> instance
// lock_func returns semaphore_units<>
explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata::config cfg)
: _shared(make_token_metadata_ptr(std::move(cfg)))
: _shared(make_lw_shared<token_metadata>(*this, cfg))
, _lock_func(std::move(lock_func))
, _versions_barrier("shared_token_metadata::versions_barrier")
{
@@ -418,6 +420,17 @@ public:
shared_token_metadata(const shared_token_metadata& x) = delete;
shared_token_metadata(shared_token_metadata&& x) = default;
future<> stop() noexcept;
mutable_token_metadata_ptr make_token_metadata_ptr() {
return make_lw_shared<token_metadata>(*this, token_metadata::config{_shared->get_topology().get_config()});
}
mutable_token_metadata_ptr make_token_metadata_ptr(token_metadata&& tm) {
tm.set_shared_token_metadata(*this);
return make_lw_shared<token_metadata>(std::move(tm));
}
token_metadata_ptr get() const noexcept {
return _shared;
}
@@ -467,6 +480,8 @@ public:
// Must be called on shard 0.
static future<> mutate_on_all_shards(sharded<shared_token_metadata>& stm, seastar::noncopyable_function<future<> (token_metadata&)> func);
void clear_and_dispose(std::unique_ptr<token_metadata_impl> impl) noexcept;
private:
// for testing only, unsafe to be called without awaiting get_lock() first
void mutate_token_metadata_for_test(seastar::noncopyable_function<void (token_metadata&)> func);

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:f69e30ac03713e439d4f9fe347aafe2201d8605880358d3142b6f6bc706c3014
size 5966816
oid sha256:4c7c513b0a83214e35598a41db10ddb9a4266a63f640d2a49e35d646061969b1
size 5990560

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:9ec68edb2980fae1fcf63046b399f30b882fc7b77b4bc316c7055f75820d26f1
size 5975376
oid sha256:ad22fc390a168892eda150ed19966405e2ceb3393cbf507ebd5df0f1332a869c
size 6009316

View File

@@ -2244,7 +2244,7 @@ future<> repair_service::replace_with_repair(std::unordered_map<sstring, locator
auto reason = streaming::stream_reason::replace;
// update a cloned version of tmptr
// no need to set the original version
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
auto cloned_tmptr = _db.local().get_shared_token_metadata().make_token_metadata_ptr(std::move(cloned_tm));
cloned_tmptr->update_topology(tmptr->get_my_id(), myloc, locator::node::state::replacing);
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id());
auto source_dc = utils::optional_param(myloc.dc);
@@ -2283,7 +2283,8 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
}
table_id tid = t->schema()->id();
// Invoke group0 read barrier before obtaining erm pointer so that it sees all prior metadata changes
auto dropped = co_await streaming::table_sync_and_check(_db.local(), _mm, tid);
auto dropped = !utils::get_local_injector().enter("repair_tablets_no_sync") &&
co_await streaming::table_sync_and_check(_db.local(), _mm, tid);
if (dropped) {
rlogger.debug("repair[{}] Table {}.{} does not exist anymore", rid.uuid(), keyspace_name, table_name);
continue;
@@ -2292,11 +2293,15 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
while (true) {
_repair_module->check_in_shutdown();
erm = t->get_effective_replication_map();
auto local_version = erm->get_token_metadata().get_version();
const locator::tablet_map& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
if (!tmap.has_transitions()) {
if (!tmap.has_transitions() && co_await container().invoke_on(0, [local_version] (repair_service& rs) {
// We need to ensure that there is no ongoing global request.
return local_version == rs._tsm.local()._topology.version && !rs._tsm.local()._topology.is_busy();
})) {
break;
}
rlogger.info("repair[{}] Table {}.{} has tablet transitions, waiting for topology to quiesce", rid.uuid(), keyspace_name, table_name);
rlogger.info("repair[{}] Topology is busy, waiting for it to quiesce", rid.uuid());
erm = nullptr;
co_await container().invoke_on(0, [] (repair_service& rs) {
return rs._tsm.local().await_not_busy();

View File

@@ -1448,7 +1448,9 @@ private:
size_t row_bytes = co_await get_repair_rows_size(row_list);
_metrics.tx_row_nr += row_list.size();
_metrics.tx_row_bytes += row_bytes;
for (repair_row& r : row_list) {
while (!row_list.empty()) {
repair_row r = std::move(row_list.front());
row_list.pop_front();
const auto& dk_with_hash = r.get_dk_with_hash();
// No need to search from the beginning of the rows. Look at the end of repair_rows_on_wire is enough.
if (rows.empty()) {

View File

@@ -355,7 +355,7 @@ database::view_update_read_concurrency_sem() {
return *sem;
}
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm,
compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory& scf, const abort_source& abort, utils::cross_shard_barrier barrier)
: _stats(make_lw_shared<db_stats>())
, _user_types(std::make_shared<db_user_types_storage>(*this))

View File

@@ -1599,7 +1599,7 @@ private:
service::migration_notifier& _mnotifier;
gms::feature_service& _feat;
std::vector<std::any> _listeners;
const locator::shared_token_metadata& _shared_token_metadata;
locator::shared_token_metadata& _shared_token_metadata;
lang::manager& _lang_manager;
reader_concurrency_semaphore_group _reader_concurrency_semaphores_group;
@@ -1684,7 +1684,7 @@ public:
// (keyspace/table definitions, column mappings etc.)
future<> parse_system_tables(distributed<service::storage_proxy>&, sharded<db::system_keyspace>&);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm,
compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory&, const abort_source& abort,
utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
database(database&&) = delete;
@@ -1719,7 +1719,7 @@ public:
return _compaction_manager;
}
const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
locator::token_metadata_ptr get_token_metadata_ptr() const { return _shared_token_metadata.get(); }
const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); }

View File

@@ -1101,26 +1101,23 @@ private:
global_request_id = guard.new_group0_state_id();
std::vector<canonical_mutation> updates;
topology_mutation_builder builder(guard.write_timestamp());
topology_request_tracking_mutation_builder trbuilder(global_request_id, _sp._features.topology_requests_type_column);
trbuilder.set_truncate_table_data(table_id)
.set("done", false)
.set("start_time", db_clock::now());
if (!_sp._features.topology_global_request_queue) {
builder.set_global_topology_request(global_topology_request::truncate_table)
.set_global_topology_request_id(global_request_id);
} else {
builder.queue_global_topology_request_id(global_request_id);
trbuilder.set("request_type", global_topology_request::truncate_table);
}
updates.emplace_back(builder.build());
updates.emplace_back(topology_request_tracking_mutation_builder(global_request_id, _sp._features.topology_requests_type_column)
.set_truncate_table_data(table_id)
.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::truncate_table)
.build());
slogger.info("Creating TRUNCATE global topology request for table {}.{}", ks_name, cf_name);
topology_change change{std::move(updates)};
topology_change change{{builder.build(), trbuilder.build()}};
sstring reason = "Truncating table";
group0_command g0_cmd = _group0_client.prepare_command(std::move(change), guard, reason);
try {
@@ -1615,6 +1612,10 @@ public:
return _type == db::write_type::VIEW;
}
bool is_batch() const noexcept {
return _type == db::write_type::BATCH;
}
void set_cdc_operation_result_tracker(lw_shared_ptr<cdc::operation_result_tracker> tracker) {
_cdc_operation_result_tracker = std::move(tracker);
}
@@ -2120,7 +2121,7 @@ paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& conte
// create_write_response_handler is overloaded for paxos::proposal and will
// create cas_mutation holder, which consequently will ensure paxos::learn is
// used.
auto f = _proxy->mutate_internal(std::move(m), db::consistency_level::ANY, false, tr_state, _permit, _timeout)
auto f = _proxy->mutate_internal(std::move(m), db::consistency_level::ANY, tr_state, _permit, _timeout)
.then(utils::result_into_future<result<>>);
// TODO: provided commits did not invalidate the prepare we just did above (which they
@@ -2472,7 +2473,7 @@ future<> paxos_response_handler::learn_decision(lw_shared_ptr<paxos::proposal> d
return v.schema()->id() == base_tbl_id;
});
if (!mutations.empty()) {
f_cdc = _proxy->mutate_internal(std::move(mutations), _cl_for_learn, false, tr_state, _permit, _timeout, std::move(tracker))
f_cdc = _proxy->mutate_internal(std::move(mutations), _cl_for_learn, tr_state, _permit, _timeout, {}, std::move(tracker))
.then(utils::result_into_future<result<>>);
}
}
@@ -2480,7 +2481,7 @@ future<> paxos_response_handler::learn_decision(lw_shared_ptr<paxos::proposal> d
// Path for the "base" mutations
std::array<std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout)
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, tr_state, _permit, _timeout)
.then(utils::result_into_future<result<>>);
co_await when_all_succeed(std::move(f_cdc), std::move(f_lwt)).discard_result();
@@ -3071,6 +3072,10 @@ struct hint_wrapper {
mutation mut;
};
struct batchlog_replay_mutation {
mutation mut;
};
struct read_repair_mutation {
std::unordered_map<locator::host_id, std::optional<mutation>> value;
locator::effective_replication_map_ptr ermp;
@@ -3084,6 +3089,12 @@ template <> struct fmt::formatter<service::hint_wrapper> : fmt::formatter<string
}
};
template <> struct fmt::formatter<service::batchlog_replay_mutation> : fmt::formatter<string_view> {
auto format(const service::batchlog_replay_mutation& h, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "batchlog_replay_mutation{{{}}}", h.mut);
}
};
template <>
struct fmt::formatter<service::read_repair_mutation> : fmt::formatter<string_view> {
auto format(const service::read_repair_mutation& m, fmt::format_context& ctx) const {
@@ -3449,6 +3460,12 @@ storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consiste
std::move(permit), allow_limit, is_cancellable::yes);
}
result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler(const batchlog_replay_mutation& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
return create_write_response_handler_helper(m.mut.schema(), m.mut.token(), std::make_unique<shared_mutation>(m.mut), cl, type, tr_state,
std::move(permit), allow_limit, is_cancellable::yes);
}
result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
host_id_vector_replica_set endpoints;
@@ -3843,7 +3860,7 @@ future<result<>> storage_proxy::do_mutate(std::vector<mutation> mutations, db::c
}).begin();
return seastar::when_all_succeed(
mutate_counters(std::ranges::subrange(mutations.begin(), mid), cl, tr_state, permit, timeout),
mutate_internal(std::ranges::subrange(mid, mutations.end()), cl, false, tr_state, permit, timeout, std::move(cdc_tracker), allow_limit)
mutate_internal(std::ranges::subrange(mid, mutations.end()), cl, tr_state, permit, timeout, {}, std::move(cdc_tracker), allow_limit)
).then([] (std::tuple<result<>> res) {
// For now, only mutate_internal returns a result<>
return std::get<0>(std::move(res));
@@ -3852,8 +3869,10 @@ future<result<>> storage_proxy::do_mutate(std::vector<mutation> mutations, db::c
future<> storage_proxy::replicate_counter_from_leader(mutation m, db::consistency_level cl, tracing::trace_state_ptr tr_state,
clock_type::time_point timeout, service_permit permit) {
// we need to pass correct db::write_type in case of a timeout so that
// client doesn't attempt to retry the request.
// FIXME: do not send the mutation to itself, it has already been applied (it is not incorrect to do so, though)
return mutate_internal(std::array<mutation, 1>{std::move(m)}, cl, true, std::move(tr_state), std::move(permit), timeout)
return mutate_internal(std::array<mutation, 1>{std::move(m)}, cl, std::move(tr_state), std::move(permit), timeout, db::write_type::COUNTER)
.then(utils::result_into_future<result<>>);
}
@@ -3864,8 +3883,8 @@ future<> storage_proxy::replicate_counter_from_leader(mutation m, db::consistenc
*/
template<typename Range>
future<result<>>
storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool counters, tracing::trace_state_ptr tr_state, service_permit permit,
std::optional<clock_type::time_point> timeout_opt, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker,
storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit,
std::optional<clock_type::time_point> timeout_opt, std::optional<db::write_type> type_opt, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker,
db::allow_per_partition_rate_limit allow_limit) {
if (std::ranges::empty(mutations)) {
return make_ready_future<result<>>(bo::success());
@@ -3874,12 +3893,10 @@ storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool c
slogger.trace("mutate cl={}", cl);
mlogger.trace("mutations={}", mutations);
// If counters is set it means that we are replicating counter shards. There
// is no need for special handling anymore, since the leader has already
// done its job, but we need to return correct db::write_type in case of
// a timeout so that client doesn't attempt to retry the request.
auto type = counters ? db::write_type::COUNTER
: (std::next(std::begin(mutations)) == std::end(mutations) ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH);
// the parameter type_opt allows to pass a specific type if needed for
// special handling, e.g. counters. otherwise, a default type is used.
auto type = type_opt.value_or(std::next(std::begin(mutations)) == std::end(mutations) ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH);
utils::latency_counter lc;
lc.start();
@@ -4065,6 +4082,7 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
};
future<> async_remove_from_batchlog() {
// delete batch
utils::get_local_injector().inject("storage_proxy_fail_remove_from_batchlog", [] { throw std::runtime_error("Error injection: failing remove from batchlog"); });
auto key = partition_key::from_exploded(*_schema, {uuid_type->decompose(_batch_uuid)});
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
mutation m(_schema, key);
@@ -4136,13 +4154,15 @@ mutation storage_proxy::do_get_batchlog_mutation_for(schema_ptr schema, const st
for (auto& m : fm) {
ser::serialize(out, m);
}
return to_bytes(out.linearize());
return std::move(out).to_managed_bytes();
}();
mutation m(schema, key);
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("version"), version, timestamp);
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("written_at"), now, timestamp);
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("data"), data_value(std::move(data)), timestamp);
// Avoid going through data_value and therefore `bytes`, as it can be large (#24809).
auto cdef_data = schema->get_column_definition(to_bytes("data"));
m.set_cell(clustering_key_prefix::make_empty(), *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data)));
return m;
}
@@ -4248,7 +4268,16 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_a_s) {
std::array<hint_wrapper, 1> ms{hint_wrapper { fm_a_s.fm.unfreeze(fm_a_s.s) }};
return mutate_internal(std::move(ms), db::consistency_level::ALL, false, nullptr, empty_service_permit())
return mutate_internal(std::move(ms), db::consistency_level::ALL, nullptr, empty_service_permit())
.then(utils::result_into_future<result<>>);
}
future<> storage_proxy::send_batchlog_replay_to_all_replicas(std::vector<mutation> mutations, clock_type::time_point timeout) {
std::vector<batchlog_replay_mutation> ms = mutations | std::views::transform([] (auto&& m) {
return batchlog_replay_mutation(std::move(m));
}) | std::ranges::to<std::vector<batchlog_replay_mutation>>();
return mutate_internal(std::move(ms), db::consistency_level::ALL, nullptr, empty_service_permit(), timeout, db::write_type::BATCH)
.then(utils::result_into_future<result<>>);
}
@@ -4431,7 +4460,7 @@ future<result<>> storage_proxy::schedule_repair(locator::effective_replication_m
std::views::transform([ermp] (auto& v) { return read_repair_mutation{std::move(v), ermp}; }) |
// The transform above is destructive, materialize into a vector to make the range re-iterable.
std::ranges::to<std::vector<read_repair_mutation>>()
, cl, false, std::move(trace_state), std::move(permit));
, cl, std::move(trace_state), std::move(permit));
}
class abstract_read_resolver {
@@ -6964,6 +6993,12 @@ future<> storage_proxy::abort_view_writes() {
});
}
future<> storage_proxy::abort_batch_writes() {
return async([this] {
cancel_write_handlers([] (const abstract_write_response_handler& handler) { return handler.is_batch(); });
});
}
future<>
storage_proxy::stop() {
return make_ready_future<>();

View File

@@ -87,6 +87,7 @@ class mutation_holder;
class client_state;
class migration_manager;
struct hint_wrapper;
struct batchlog_replay_mutation;
struct read_repair_mutation;
using replicas_per_token_range = std::unordered_map<dht::token_range, std::vector<locator::host_id>>;
@@ -340,6 +341,7 @@ private:
const host_id_vector_topology_change& pending_endpoints, host_id_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable);
result<response_id_type> create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const batchlog_replay_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
@@ -427,7 +429,7 @@ private:
void unthrottle();
void handle_read_error(std::variant<exceptions::coordinator_exception_container, std::exception_ptr> failure, bool range);
template<typename Range>
future<result<>> mutate_internal(Range mutations, db::consistency_level cl, bool counter_write, tracing::trace_state_ptr tr_state, service_permit permit, std::optional<clock_type::time_point> timeout_opt = { }, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker = { }, db::allow_per_partition_rate_limit allow_limit = db::allow_per_partition_rate_limit::no);
future<result<>> mutate_internal(Range mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, std::optional<clock_type::time_point> timeout_opt = { }, std::optional<db::write_type> type = { }, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker = { }, db::allow_per_partition_rate_limit allow_limit = db::allow_per_partition_rate_limit::no);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> query_nonsingular_mutations_locally(
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range_vector&& pr, tracing::trace_state_ptr trace_state,
clock_type::time_point timeout);
@@ -631,6 +633,8 @@ public:
future<> send_hint_to_all_replicas(frozen_mutation_and_schema fm_a_s);
future<> send_batchlog_replay_to_all_replicas(std::vector<mutation> mutations, clock_type::time_point timeout);
// Send a mutation to one specific remote target.
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
// hinted handoff support, and just one target. See also
@@ -705,6 +709,7 @@ public:
void allow_replaying_hints() noexcept;
future<> drain_hints_for_left_nodes();
future<> abort_view_writes();
future<> abort_batch_writes();
future<> change_hints_host_filter(db::hints::host_filter new_filter);
const db::hints::host_filter& get_hints_host_filter() const;

View File

@@ -111,7 +111,6 @@
#include "node_ops/task_manager_module.hh"
#include "service/task_manager_module.hh"
#include "service/topology_mutation.hh"
#include "service/topology_coordinator.hh"
#include "cql3/query_processor.hh"
#include "service/qos/service_level_controller.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
@@ -740,9 +739,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
auto saved_tmpr = get_token_metadata_ptr();
{
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = make_token_metadata_ptr(token_metadata::config {
get_token_metadata().get_topology().get_config()
});
auto tmptr = _shared_token_metadata.make_token_metadata_ptr();
tmptr->invalidate_cached_rings();
tmptr->set_version(_topology_state_machine._topology.version);
@@ -1134,7 +1131,8 @@ future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::hol
_tablet_allocator.local(),
get_ring_delay(),
_lifecycle_notifier,
_feature_service);
_feature_service,
_topology_cmd_rpc_tracker);
}
} catch (...) {
rtlogger.info("raft_state_monitor_fiber aborted with {}", std::current_exception());
@@ -3146,9 +3144,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
try {
auto base_shard = this_shard_id();
pending_token_metadata_ptr[base_shard] = tmptr;
auto& sharded_token_metadata = _shared_token_metadata.container();
// clone a local copy of updated token_metadata on all other shards
co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> {
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tmptr->clone_async());
pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async());
});
// Precalculate new effective_replication_map for all keyspaces
@@ -5747,7 +5746,7 @@ future<> storage_service::snitch_reconfigured() {
future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd) {
raft_topology_cmd_result result;
rtlogger.debug("topology cmd rpc {} is called", cmd.cmd);
rtlogger.info("topology cmd rpc {} is called index={}", cmd.cmd, cmd_index);
try {
auto& raft_server = _group0->group0_server();
@@ -6077,6 +6076,9 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
} catch (...) {
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, std::current_exception());
}
rtlogger.info("topology cmd rpc {} completed with status={} index={}",
cmd.cmd, (result.status == raft_topology_cmd_result::command_status::success) ? "suceeded" : "failed", cmd_index);
co_return result;
}

View File

@@ -48,6 +48,7 @@
#include "timestamp.hh"
#include "utils/user_provided_param.hh"
#include "utils/sequenced_set.hh"
#include "service/topology_coordinator.hh"
class node_ops_cmd_request;
class node_ops_cmd_response;
@@ -282,12 +283,12 @@ private:
future<> snitch_reconfigured();
future<mutable_token_metadata_ptr> get_mutable_token_metadata_ptr() noexcept {
return _shared_token_metadata.get()->clone_async().then([] (token_metadata tm) {
return _shared_token_metadata.get()->clone_async().then([this] (token_metadata tm) {
// bump the token_metadata ring_version
// to invalidate cached token/replication mappings
// when the modified token_metadata is committed.
tm.invalidate_cached_rings();
return make_ready_future<mutable_token_metadata_ptr>(make_token_metadata_ptr(std::move(tm)));
return _shared_token_metadata.make_token_metadata_ptr(std::move(tm));
});
}
@@ -873,6 +874,11 @@ private:
std::optional<shared_future<>> _rebuild_result;
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
tablet_op_registry _tablet_ops;
// This tracks active topology cmd rpc. There can be only one active
// cmd running and by inspecting this structure it can be checked which
// cmd is current executing and which nodes are still did not reply.
// Needed for debugging.
topology_coordinator_cmd_rpc_tracker _topology_cmd_rpc_tracker;
struct {
raft::term_t term{0};
uint64_t last_index{0};
@@ -941,6 +947,10 @@ public:
// Waits for topology state in which none of tablets has replaced_id as a replica.
// Must be called on shard 0.
future<> await_tablets_rebuilt(raft::server_id replaced_id);
topology_coordinator_cmd_rpc_tracker get_topology_cmd_status() {
return _topology_cmd_rpc_tracker;
}
private:
// Tracks progress of the upgrade to topology coordinator.
future<> _upgrade_to_topology_coordinator_fiber = make_ready_future<>();

View File

@@ -842,7 +842,7 @@ public:
db_clock::duration repair_time_diff;
};
std::vector<repair_plan> plans;
utils::chunked_vector<repair_plan> plans;
auto migration_tablet_ids = co_await mplan.get_migration_tablet_ids();
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
auto& tmap = *tmap_;

View File

@@ -147,6 +147,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
group0_voter_handler _voter_handler;
topology_coordinator_cmd_rpc_tracker& _topology_cmd_rpc_tracker;
const locator::token_metadata& get_token_metadata() const noexcept {
return *_shared_tm.get();
}
@@ -389,6 +391,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
future<> exec_direct_command_helper(raft::server_id id, uint64_t cmd_index, const raft_topology_cmd& cmd) {
rtlogger.debug("send {} command with term {} and index {} to {}",
cmd.cmd, _term, cmd_index, id);
_topology_cmd_rpc_tracker.active_dst.emplace(id);
auto _ = seastar::defer([this, id] { _topology_cmd_rpc_tracker.active_dst.erase(id); });
auto result = _db.get_token_metadata().get_topology().is_me(to_host_id(id)) ?
co_await _raft_topology_cmd_handler(_term, cmd_index, cmd) :
co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(
@@ -403,12 +408,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto id = node.id;
release_node(std::move(node));
const auto cmd_index = ++_last_cmd_index;
_topology_cmd_rpc_tracker.current = cmd.cmd;
_topology_cmd_rpc_tracker.index = cmd_index;
co_await exec_direct_command_helper(id, cmd_index, cmd);
co_return retake_node(co_await start_operation(), id);
};
future<> exec_global_command_helper(auto nodes, const raft_topology_cmd& cmd) {
const auto cmd_index = ++_last_cmd_index;
_topology_cmd_rpc_tracker.current = cmd.cmd;
_topology_cmd_rpc_tracker.index = cmd_index;
auto f = co_await coroutine::as_future(
seastar::parallel_for_each(std::move(nodes), [this, &cmd, cmd_index] (raft::server_id id) {
return exec_direct_command_helper(id, cmd_index, cmd);
@@ -1730,6 +1739,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
future<> handle_tablet_resize_finalization(group0_guard g) {
co_await utils::get_local_injector().inject("handle_tablet_resize_finalization_wait", [] (auto& handler) -> future<> {
rtlogger.info("handle_tablet_resize_finalization: waiting");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{60});
});
// Executes a global barrier to guarantee that any process (e.g. repair) holding stale version
// of token metadata will complete before we update topology.
auto guard = co_await global_tablet_token_metadata_barrier(std::move(g));
@@ -2988,7 +3002,8 @@ public:
raft_topology_cmd_handler_type raft_topology_cmd_handler,
tablet_allocator& tablet_allocator,
std::chrono::milliseconds ring_delay,
gms::feature_service& feature_service)
gms::feature_service& feature_service,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker)
: _sys_dist_ks(sys_dist_ks), _gossiper(gossiper), _messaging(messaging)
, _shared_tm(shared_tm), _sys_ks(sys_ks), _db(db)
, _group0(group0), _topo_sm(topo_sm), _as(as)
@@ -3000,6 +3015,7 @@ public:
, _ring_delay(ring_delay)
, _group0_holder(_group0.hold_group0_gate())
, _voter_handler(group0, topo_sm._topology, gossiper, feature_service)
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
, _async_gate("topology_coordinator")
{}
@@ -3614,7 +3630,8 @@ future<> run_topology_coordinator(
tablet_allocator& tablet_allocator,
std::chrono::milliseconds ring_delay,
endpoint_lifecycle_notifier& lifecycle_notifier,
gms::feature_service& feature_service) {
gms::feature_service& feature_service,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker) {
topology_coordinator coordinator{
sys_dist_ks, gossiper, messaging, shared_tm,
@@ -3622,7 +3639,8 @@ future<> run_topology_coordinator(
std::move(raft_topology_cmd_handler),
tablet_allocator,
ring_delay,
feature_service};
feature_service,
topology_cmd_rpc_tracker};
std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);

View File

@@ -62,6 +62,12 @@ future<> wait_for_gossiper(raft::server_id id, const gms::gossiper& g, seastar::
using raft_topology_cmd_handler_type = noncopyable_function<future<raft_topology_cmd_result>(
raft::term_t, uint64_t, const raft_topology_cmd&)>;
struct topology_coordinator_cmd_rpc_tracker {
raft_topology_cmd::command current;
uint64_t index;
std::set<raft::server_id> active_dst;
};
future<> run_topology_coordinator(
seastar::sharded<db::system_distributed_keyspace>& sys_dist_ks, gms::gossiper& gossiper,
netw::messaging_service& messaging, locator::shared_token_metadata& shared_tm,
@@ -71,6 +77,7 @@ future<> run_topology_coordinator(
tablet_allocator& tablet_allocator,
std::chrono::milliseconds ring_delay,
endpoint_lifecycle_notifier& lifecycle_notifier,
gms::feature_service& feature_service);
gms::feature_service& feature_service,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker);
}

View File

@@ -14,6 +14,7 @@
#include <functional>
#include <seastar/core/on_internal_error.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/closeable.hh>
#include "locator/types.hh"
#include "test/lib/scylla_test_case.hh"
@@ -213,6 +214,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) {
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, node1_shard_count);

View File

@@ -280,6 +280,7 @@ void simple_test() {
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
std::vector<ring_point> ring_points = {
{ 1.0, inet_address("192.100.10.1") },
@@ -363,6 +364,7 @@ void heavy_origin_test() {
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); },
locator::token_metadata::config{locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_stm = deferred_stop(stm);
std::vector<int> dc_racks = {2, 4, 8};
std::vector<int> dc_endpoints = {128, 256, 512};
@@ -476,6 +478,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
@@ -567,6 +570,7 @@ static void test_random_balancing(sharded<snitch_ptr>& snitch, gms::inet_address
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
@@ -897,6 +901,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
for (size_t run = 0; run < RUNS; ++run) {
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
std::unordered_set<dht::token> random_tokens;
while (random_tokens.size() < nodes.size() * VNODES) {
@@ -1043,6 +1048,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) {
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) {
auto& topo = tm.get_topology();
generate_topology(topo, datacenters, nodes);
@@ -1087,6 +1093,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_sort_by_proximity) {
tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location;
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
generate_topology(tm.get_topology(), datacenters, nodes);
return make_ready_future();
@@ -1122,6 +1129,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
.local_dc_rack = ip1_dc_rack,
}
});
auto stop_stm = deferred_stop(stm);
// get_location() should work before any node is added
@@ -1249,6 +1257,7 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
@@ -1401,6 +1410,7 @@ void test_complex_rack_aware_view_pairing_test(bool more_or_less) {
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {

View File

@@ -14,7 +14,9 @@
#include <fmt/std.h>
#include <seastar/core/future.hh>
#include <seastar/util/closeable.hh>
#include "seastarx.hh"
#include "service/qos/qos_common.hh"
#include "test/lib/scylla_test_case.hh"
#include "test/lib/test_utils.hh"
@@ -107,6 +109,7 @@ SEASTAR_THREAD_TEST_CASE(subscriber_simple) {
sl_options.shares.emplace<int32_t>(1000);
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get();
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_tm = deferred_stop(tm);
sharded<abort_source> as;
as.start().get();
auto stop_as = defer([&as] { as.stop().get(); });
@@ -180,6 +183,7 @@ SEASTAR_THREAD_TEST_CASE(too_many_service_levels) {
sl_options.workload = service_level_options::workload_type::interactive;
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg1", 1.0).get();
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_tm = deferred_stop(tm);
sharded<abort_source> as;
as.start().get();
auto stop_as = defer([&as] { as.stop().get(); });
@@ -256,6 +260,7 @@ SEASTAR_THREAD_TEST_CASE(add_remove_bad_sequence) {
sl_options.shares.emplace<int32_t>(1000);
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg3", 1.0).get();
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_tm = deferred_stop(tm);
sharded<abort_source> as;
as.start().get();
auto stop_as = defer([&as] { as.stop().get(); });
@@ -282,6 +287,7 @@ SEASTAR_THREAD_TEST_CASE(verify_unset_shares_in_cache_when_service_level_created
sl_options.shares.emplace<int32_t>(1000);
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get();
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
auto stop_tm = deferred_stop(tm);
sharded<abort_source> as;
as.start().get();

View File

@@ -52,9 +52,11 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
}
};
auto& stm = e.shared_token_metadata().local();
{
// Ring with minimum token
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()});
auto tmptr = stm.make_token_metadata_ptr();
const auto host_id = locator::host_id{utils::UUID(0, 1)};
tmptr->update_topology(host_id, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal);
tmptr->update_normal_tokens(std::unordered_set<dht::token>({dht::minimum_token()}), host_id).get();
@@ -69,7 +71,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
}
{
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()});
auto tmptr = stm.make_token_metadata_ptr();
const auto id1 = locator::host_id{utils::UUID(0, 1)};
const auto id2 = locator::host_id{utils::UUID(0, 2)};
tmptr->update_topology(id1, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal);

View File

@@ -799,6 +799,7 @@ SEASTAR_TEST_CASE(test_get_shard) {
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
auto stop_stm = deferred_stop(stm);
tablet_id tid(0);
tablet_id tid1(0);
@@ -1048,7 +1049,7 @@ SEASTAR_TEST_CASE(test_sharder) {
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
token_metadata tokm(e.get_shared_token_metadata().local(), token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
tokm.get_topology().add_or_update_endpoint(h1);
std::vector<tablet_id> tablet_ids;
@@ -1263,7 +1264,14 @@ SEASTAR_TEST_CASE(test_intranode_sharding) {
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_host_id = h1;
tm_cfg.topo_cfg.local_dc_rack = endpoint_dc_rack::default_location;
semaphore sem(1);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto tmptr = stm.make_token_metadata_ptr();
auto& tokm = *tmptr;
tokm.get_topology().add_or_update_endpoint(h1);
auto leaving_replica = tablet_replica{h1, 5};
@@ -3606,6 +3614,7 @@ static void execute_tablet_for_new_rf_test(calculate_tablet_replicas_for_new_rf_
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
tm_cfg.topo_cfg.this_host_id = test_config.ring_points[0].id;
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
// Initialize the token_metadata
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {

View File

@@ -8,6 +8,7 @@
#include <boost/test/unit_test.hpp>
#include <fmt/ranges.h>
#include <seastar/util/closeable.hh>
#include "test/lib/scylla_test_case.hh"
#include "test/lib/test_utils.hh"
#include "locator/token_metadata.hh"
@@ -31,13 +32,11 @@ namespace {
};
}
mutable_token_metadata_ptr create_token_metadata(host_id this_host_id) {
return make_lw_shared<token_metadata>(token_metadata::config {
topology::config {
.this_host_id = this_host_id,
.local_dc_rack = get_dc_rack(this_host_id)
}
});
token_metadata::config create_token_metadata_config(host_id this_host_id) {
return token_metadata::config{topology::config{
.this_host_id = this_host_id,
.local_dc_rack = get_dc_rack(this_host_id)
}};
}
template <typename Strategy>
@@ -55,7 +54,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy
const auto t1 = dht::token::from_int64(10);
const auto t2 = dht::token::from_int64(20);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_normal_tokens({t1}, e1_id).get();
@@ -75,7 +78,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) {
const auto e1_id = gen_id(1);
const auto e2_id = gen_id(2);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_normal_tokens({t1}, e1_id).get();
@@ -103,7 +110,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) {
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
@@ -133,7 +144,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) {
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
@@ -165,7 +180,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) {
const auto e3_id = gen_id(3);
const auto e4_id = gen_id(4);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
@@ -201,7 +220,11 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
auto token_metadata = create_token_metadata(e1_id);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
@@ -254,7 +277,11 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) {
const auto e1_id1 = gen_id(1);
const auto e1_id2 = gen_id(2);
auto token_metadata = create_token_metadata(e1_id2);
semaphore sem(1);
auto tm_cfg = create_token_metadata_config(e1_id2);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto token_metadata = stm.make_token_metadata_ptr();
token_metadata->update_topology(e1_id1, get_dc_rack(e1_id1), node::state::being_replaced);
token_metadata->update_normal_tokens({t1}, e1_id1).get();

View File

@@ -0,0 +1,204 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import pytest
import logging
import time
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.cluster.util import new_test_keyspace, reconnect_driver, wait_for_cql_and_get_hosts
from test.cluster.conftest import skip_mode
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_batchlog_replay_while_a_node_is_down(manager: ManagerClient) -> None:
""" Test that batchlog replay handles the case when a node is down while replaying a batch.
Reproduces issue #24599.
1. Create a cluster with 3 nodes.
2. Write a batch and inject an error to fail it before it's removed from the batchlog, so it
needs to be replayed.
3. Stop server 1.
4. Server 0 tries to replay the batch. it sends the mutation to all replicas, but one of them is down,
so it should fail.
5. Bring server 1 back up.
6. Verify that the batch is replayed and removed from the batchlog eventually.
"""
cmdline=['--logger-log-level', 'batchlog_manager=trace']
config = {'error_injections_at_startup': ['short_batchlog_manager_replay_interval'], 'write_request_timeout_in_ms': 2000}
servers = await manager.servers_add(3, config=config, cmdline=cmdline, auto_rack_dc="dc1")
cql, hosts = await manager.get_ready_cql(servers)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key, c))")
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog", one_shot=False) for s in servers])
# make sure the batch is replayed only after the server is stopped
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "skip_batch_replay", one_shot=False) for s in servers])
s0_log = await manager.server_open_log(servers[0].server_id)
try:
await cql.run_async(f"BEGIN BATCH INSERT INTO {ks}.tab (key, c, v) VALUES (0,0,0); INSERT INTO {ks}.tab (key, c, v) VALUES (1,1,1); APPLY BATCH")
except Exception as e:
# injected error is expected
logger.error(f"Error executing batch: {e}")
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog") for s in servers])
await manager.server_stop(servers[1].server_id)
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog", host=hosts[0]))[0].count
assert batchlog_row_count > 0
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "skip_batch_replay") for s in servers if s != servers[1]])
# The batch is replayed while server 1 is down
await s0_log.wait_for('Replaying batch', timeout=60)
await asyncio.sleep(1)
# Bring server 1 back up and verify that eventually the batch is replayed and removed from the batchlog
await manager.server_start(servers[1].server_id)
s0_mark = await s0_log.mark()
await s0_log.wait_for('Finished replayAllFailedBatches', timeout=60, from_mark=s0_mark)
async def batchlog_empty() -> bool:
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog", host=hosts[0]))[0].count
if batchlog_row_count == 0:
return True
await wait_for(batchlog_empty, time.time() + 60)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_batchlog_replay_aborted_on_shutdown(manager: ManagerClient) -> None:
""" Similar to the previous test, but also verifies that the batchlog replay is aborted on shutdown,
and node shutdown is not stuck.
1. Create a cluster with 3 nodes.
2. Write a batch and inject an error to fail it before it's removed from the batchlog, so it
needs to be replayed.
3. Stop server 1.
4. Server 0 tries to replay the batch. it sends the mutation to all replicas, but one of them is down,
so it should fail.
5. Shut down server 0 gracefully, which should abort the batchlog replay which is in progress.
6. Bring server 0 and server 1 back up.
6. Verify that the batch is replayed and removed from the batchlog eventually.
"""
cmdline=['--logger-log-level', 'batchlog_manager=trace']
config = {'error_injections_at_startup': ['short_batchlog_manager_replay_interval'], 'write_request_timeout_in_ms': 2000}
servers = await manager.servers_add(3, config=config, cmdline=cmdline, auto_rack_dc="dc1")
cql, hosts = await manager.get_ready_cql(servers)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key, c))")
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog", one_shot=False) for s in servers])
# make sure the batch is replayed only after the server is stopped
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "skip_batch_replay", one_shot=False) for s in servers])
s0_log = await manager.server_open_log(servers[0].server_id)
try:
await cql.run_async(f"BEGIN BATCH INSERT INTO {ks}.tab (key, c, v) VALUES (0,0,0); INSERT INTO {ks}.tab (key, c, v) VALUES (1,1,1); APPLY BATCH")
except Exception as e:
# injected error is expected
logger.error(f"Error executing batch: {e}")
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog") for s in servers])
await manager.server_stop(servers[1].server_id)
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "skip_batch_replay") for s in servers if s != servers[1]])
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog", host=hosts[0]))[0].count
assert batchlog_row_count > 0
# The batch is replayed while server 1 is down
await s0_log.wait_for('Replaying batch', timeout=60)
await asyncio.sleep(1)
# verify shutdown is not stuck
await manager.server_stop_gracefully(servers[0].server_id)
await manager.server_start(servers[0].server_id)
await manager.server_start(servers[1].server_id)
cql = await reconnect_driver(manager)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
async def batchlog_empty() -> bool:
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog", host=hosts[0]))[0].count
if batchlog_row_count == 0:
return True
await wait_for(batchlog_empty, time.time() + 60)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_batchlog_replay_includes_cdc(manager: ManagerClient) -> None:
""" Test that when a batch is replayed from the batchlog, it includes CDC mutations.
1. Create a cluster with a single node.
2. Create a table with CDC enabled.
3. Write a batch and inject an error to fail it after it's written to the batchlog but before the mutation is applied.
4. Wait for the batch to be replayed.
5. Verify that the data is written to the base table.
6. Verify that CDC mutations are also applied and visible in the CDC log table.
"""
cmdline = ['--logger-log-level', 'batchlog_manager=trace']
config = {'error_injections_at_startup': ['short_batchlog_manager_replay_interval'], 'write_request_timeout_in_ms': 2000}
servers = await manager.servers_add(1, config=config, cmdline=cmdline)
cql, hosts = await manager.get_ready_cql(servers)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks:
# Create table with CDC enabled
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key, c)) WITH cdc = {{'enabled': true}}")
# Enable error injection to make the batch fail after writing to batchlog
await manager.api.enable_injection(servers[0].ip_addr, "storage_proxy_fail_remove_from_batchlog", one_shot=False)
# Execute a batch that will fail due to injection but be written to batchlog
try:
await cql.run_async(
"BEGIN BATCH " +
f"INSERT INTO {ks}.tab(key, c, v) VALUES (10, 20, 30); " +
f"INSERT INTO {ks}.tab(key, c, v) VALUES (40, 50, 60); " +
"APPLY BATCH"
)
except Exception as e:
logger.info(f"Expected error executing batch: {e}")
await manager.api.disable_injection(servers[0].ip_addr, "storage_proxy_fail_remove_from_batchlog")
# Wait for data to appear in the base table
async def data_written():
result1 = await cql.run_async(f"SELECT * FROM {ks}.tab WHERE key = 10 AND c = 20")
result2 = await cql.run_async(f"SELECT * FROM {ks}.tab WHERE key = 40 AND c = 50")
if len(result1) > 0 and len(result2) > 0:
return True
await wait_for(data_written, time.time() + 60)
# Check that CDC log table exists and has the CDC mutations
cdc_table_name = f"{ks}.tab_scylla_cdc_log"
# Wait for CDC mutations to be visible
async def cdc_data_present():
result1 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 10 ALLOW FILTERING")
result2 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 40 ALLOW FILTERING")
if len(result1) > 0 and len(result2) > 0:
return True
await wait_for(cdc_data_present, time.time() + 60)
result1 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 10 ALLOW FILTERING")
assert len(result1) == 1, f"Expected 1 CDC mutation for key 10, got {len(result1)}"
result2 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 40 ALLOW FILTERING")
assert len(result2) == 1, f"Expected 1 CDC mutation for key 40, got {len(result2)}"

View File

@@ -1088,6 +1088,46 @@ async def test_tablet_split_finalization_with_migrations(manager: ManagerClient)
logger.info("Waiting for migrations to complete")
await log.wait_for("Tablet load balancer did not make any plan", from_mark=migration_mark)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_split_finalization_with_repair(manager: ManagerClient):
injection = "handle_tablet_resize_finalization_wait"
cfg = {
'enable_tablets': True,
'error_injections_at_startup': [
injection,
"repair_tablets_no_sync",
'short_tablet_stats_refresh_interval',
]
}
servers = await manager.servers_add(2, config=cfg)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH compaction = {'class': 'NullCompactionStrategy'};")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
logs = [await manager.server_open_log(s.server_id) for s in servers]
marks = [await log.mark() for log in logs]
logger.info("Trigger split in table")
await cql.run_async("ALTER TABLE test.test WITH tablets = {'min_tablet_count': 8};")
logger.info("Wait for tablets to split")
done, pending = await asyncio.wait([asyncio.create_task(log.wait_for('handle_tablet_resize_finalization: waiting', from_mark=mark)) for log, mark in zip(logs, marks)], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
async def repair():
await manager.api.client.post(f"/storage_service/repair_async/test", host=servers[0].ip_addr)
async def check_repair_waits():
await logs[0].wait_for("Topology is busy, waiting for it to quiesce", from_mark=marks[0])
await manager.api.message_injection(servers[0].ip_addr, injection)
await asyncio.gather(repair(), check_repair_waits())
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_two_tablets_concurrent_repair_and_migration_repair_writer_level(manager: ManagerClient):

View File

@@ -388,6 +388,71 @@ async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks)
return tablet_count < old_tablet_count or None
await wait_for(finished_merging, time.time() + 120)
# Reproduces #23284
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_split_merge_with_many_tables(manager: ManagerClient, racks = 2):
cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',]
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
servers = []
rf = racks
for rack_id in range(0, racks):
rack = f'rack{rack_id+1}'
servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack}))
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
await asyncio.gather(*[cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, c blob);") for i in range(1, 200)])
async def check_logs(when):
for server in servers:
log = await manager.server_open_log(server.server_id)
matches = await log.grep("Too long queue accumulated for gossip")
if matches:
pytest.fail(f"Server {server.server_id} has too long queue accumulated for gossip {when}: {matches=}")
await check_logs("after creating tables")
total_keys = 400
keys = range(total_keys)
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
async def finished_splitting():
# FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits.
# (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled.
# Per-table hints (min_tablet_count) can be used to improve this.
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count >= 16 or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
await check_logs("after split completion")
delete_keys = range(total_keys - 1)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1, total_keys)
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
async def finished_merging():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count < old_tablet_count or None
await wait_for(finished_merging, time.time() + 120)
await check_logs("after merge completion")
# Reproduces use-after-free when migration right after merge, but concurrently to background
# merge completion handler.
# See: https://github.com/scylladb/scylladb/issues/24045

View File

@@ -45,15 +45,14 @@ public:
template <typename T>
seastar::future<T> submit(seastar::noncopyable_function<T()> f) {
auto p = seastar::promise<T>();
auto fut = p.get_future();
auto wrapper = [p = std::move(p), f = std::move(f), shard = seastar::this_shard_id(), &alien = seastar::engine().alien()] () mutable noexcept {
auto wrapper = [&p, f = std::move(f), shard = seastar::this_shard_id(), &alien = seastar::engine().alien()] () mutable noexcept {
try {
auto v = f();
seastar::alien::run_on(alien, shard, [v = std::move(v), p = std::move(p)] () mutable noexcept {
seastar::alien::run_on(alien, shard, [&p, v = std::move(v)] () mutable noexcept {
p.set_value(std::move(v));
});
} catch (...) {
seastar::alien::run_on(alien, shard, [p = std::move(p), ep = std::current_exception()] () mutable noexcept {
seastar::alien::run_on(alien, shard, [&p, ep = std::current_exception()] () mutable noexcept {
p.set_exception(ep);
});
}
@@ -63,7 +62,7 @@ public:
_pending.push(std::move(wrapper));
}
_cv.notify_one();
return fut;
co_return co_await p.get_future();
}
};