Compare commits

..

7 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
ea26e4b3a5 Use tablet_task_info fields for tablet task timing
For tablet_virtual_task, set creation_time to tablet_task_info::request_time
and start_time to tablet_task_info::sched_time, matching the actual semantics
of when the request was created vs when it was scheduled for execution.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 15:05:50 +00:00
copilot-swe-agent[bot]
3b793ef09f Merge topology_request_tracking_mutation_builder calls for keyspace_rf_change
Combined the two separate topology_request_tracking_mutation_builder calls
into one, setting both start_time and done status in the same mutation builder
to reduce redundancy.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:53:23 +00:00
copilot-swe-agent[bot]
df0a59ba03 Fix start_time setting to be together with operation start
Refactored start_time setting for global requests to be included in the same
mutation batch that starts the actual operation, matching the pattern used for
node operations. This avoids an extra update_topology_state call and ensures
start_time is set atomically with the operation start.

Also updated nodetool tasks documentation to include creation_time field in
example outputs for status, list, and tree commands.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:40:01 +00:00
copilot-swe-agent[bot]
69024a09b2 Add creation_time field to nodetool tasks subcommands
Extended scylla-nodetool.cc to display creation_time in all task-related outputs:
- tasks_print_status: Added creation_time to time field formatting
- tasks_print_trees: Added creation_time column to task tree display
- tasks_print_stats_list: Added creation_time column to task stats list

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:28:01 +00:00
copilot-swe-agent[bot]
bb8f28a1ab Fix start_time setting: remove from request creation, add to execution start
Remove incorrect start_time setting from request creation sites for:
- cleanup requests
- new_cdc_generation requests
- truncate_table requests
- keyspace_rf_change requests

Add start_time setting in topology_coordinator::handle_global_request
when execution begins, matching the pattern for node operations.

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-08 14:09:22 +00:00
copilot-swe-agent[bot]
32bc7e3a1c Add creation_time field to task status and stats API
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-08 13:26:09 +00:00
copilot-swe-agent[bot]
fb4e37248d Initial plan 2025-12-08 13:03:16 +00:00
69 changed files with 807 additions and 1581 deletions

View File

@@ -3055,44 +3055,17 @@ public:
}
};
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit)
{
if (!cas_shard.this_shard()) {
_stats.shard_bounce_for_lwt++;
return container().invoke_on(cas_shard.shard(), _ssg,
[cs = client_state.move_to_other_shard(),
&mb = mutation_builders,
&dk,
ks = schema->ks_name(),
cf = schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(executor& self) mutable {
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt), &self]
(service::client_state& client_state) mutable {
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
});
}
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk, const std::vector<put_or_delete_item>& mutation_builders,
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
auto timeout = executor::default_timeout();
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
auto* op_ptr = op.get();
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility =
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
return proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
@@ -3118,11 +3091,13 @@ struct schema_decorated_key_equal {
// FIXME: if we failed writing some of the mutations, need to return a list
// of these failed mutations rather than fail the whole write (issue #5650).
future<> executor::do_batch_write(
static future<> do_batch_write(service::storage_proxy& proxy,
smp_service_group ssg,
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit) {
service_permit permit,
stats& stats) {
if (mutation_builders.empty()) {
return make_ready_future<>();
}
@@ -3144,7 +3119,7 @@ future<> executor::do_batch_write(
mutations.push_back(b.second.build(b.first, now));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return _proxy.mutate(std::move(mutations),
return proxy.mutate(std::move(mutations),
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(),
trace_state,
@@ -3153,7 +3128,7 @@ future<> executor::do_batch_write(
false,
cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
});
} else {
// Do the write via LWT:
@@ -3165,35 +3140,46 @@ future<> executor::do_batch_write(
schema_decorated_key_hash,
schema_decorated_key_equal>;
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto&& b : std::move(mutation_builders)) {
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
.schema = b.first,
.dk = dht::decorate_key(*b.first, b.second.pk())
});
for (auto& b : mutation_builders) {
auto dk = dht::decorate_key(*b.first, b.second.pk());
auto [it, added] = key_builders->try_emplace(schema_decorated_key{b.first, dk});
it->second.push_back(std::move(b.second));
}
auto* key_builders_ptr = key_builders.get();
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
_stats.write_using_lwt++;
return parallel_for_each(*key_builders_ptr, [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (const auto& e) {
stats.write_using_lwt++;
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
auto s = e.first.schema;
if (desired_shard.this_shard()) {
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, e.second, client_state, trace_state, permit);
} else {
stats.shard_bounce_for_lwt++;
return proxy.container().invoke_on(desired_shard.shard(), ssg,
[cs = client_state.move_to_other_shard(),
&mb = e.second,
&dk = e.first.dk,
ks = e.first.schema->ks_name(),
cf = e.first.schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(service::storage_proxy& proxy) mutable {
return do_with(cs.get(), [&proxy, &mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt)]
(service::client_state& client_state) mutable {
auto schema = proxy.data_dictionary().find_schema(ks, cf);
static const auto* injection_name = "alternator_executor_batch_write_wait";
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
const auto ks = handler.get("keyspace");
const auto cf = handler.get("table");
const auto shard = std::atoll(handler.get("shard")->data());
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
elogger.info("{}: hit", injection_name);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
elogger.info("{}: continue", injection_name);
}
}).then([&e, desired_shard = std::move(desired_shard),
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
{
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
});
// The desired_shard on the original shard remains alive for the duration
// of cas_write on this shard and prevents any tablet operations.
// However, we need a local instance of cas_shard on this shard
// to pass it to sp::cas, so we just create a new one.
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return cas_write(proxy, schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
}).finally([desired_shard = std::move(desired_shard)]{});
}
}).finally([key_builders = std::move(key_builders)]{});
}
}
@@ -3341,7 +3327,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
_stats.api_operations.batch_write_item_batch_total += total_items;
_stats.api_operations.batch_write_item_histogram.add(total_items);
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
// FIXME: Issue #5650: If we failed writing some of the updates,
// need to return a list of these failed updates in UnprocessedItems
// rather than fail the whole write (issue #5650).

View File

@@ -40,7 +40,6 @@ namespace cql3::selection {
namespace service {
class storage_proxy;
class cas_shard;
}
namespace cdc {
@@ -58,7 +57,6 @@ class schema_builder;
namespace alternator {
class rmw_operation;
class put_or_delete_item;
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
bool is_alternator_keyspace(const sstring& ks_name);
@@ -221,16 +219,6 @@ private:
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
future<> do_batch_write(
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit);
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit);
public:
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);

View File

@@ -729,14 +729,6 @@
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
},
{
"name":"use_sstable_identifier",
"description":"Use the sstable identifier UUID, if available, rather than the sstable generation.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
},

View File

@@ -349,9 +349,13 @@
"type":"long",
"description":"The shard the task is running on"
},
"creation_time":{
"type":"datetime",
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
},
"start_time":{
"type":"datetime",
"description":"The start time of the task; unspecified (equal to epoch) when state == created"
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
},
"end_time":{
"type":"datetime",
@@ -398,13 +402,17 @@
"type":"boolean",
"description":"Boolean flag indicating whether the task can be aborted"
},
"creation_time":{
"type":"datetime",
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
},
"start_time":{
"type":"datetime",
"description":"The start time of the task"
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
},
"end_time":{
"type":"datetime",
"description":"The end time of the task (unspecified when the task is not completed)"
"description":"The end time of the task (when execution completed); unspecified (equal to epoch) when the task is not completed"
},
"error":{
"type":"string",

View File

@@ -2020,16 +2020,12 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("cf"), ",");
auto sfopt = req->get_query_param("sf");
auto usiopt = req->get_query_param("use_sstable_identifier");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
.use_sstable_identifier = strcasecmp(usiopt.c_str(), "true") == 0
};
auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
try {
if (column_families.empty()) {
co_await snap_ctl.local().take_snapshot(tag, keynames, opts);
co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
} else {
if (keynames.empty()) {
throw httpd::bad_param_exception("The keyspace of column families must be specified");
@@ -2037,7 +2033,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
}
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts);
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
}
co_return json_void();
} catch (...) {
@@ -2072,8 +2068,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
}
compaction::compaction_stats stats;

View File

@@ -55,6 +55,7 @@ tm::task_status make_status(tasks::task_status status, sharded<gms::gossiper>& g
res.scope = status.scope;
res.state = status.state;
res.is_abortable = bool(status.is_abortable);
res.creation_time = get_time(status.creation_time);
res.start_time = get_time(status.start_time);
res.end_time = get_time(status.end_time);
res.error = status.error;
@@ -83,6 +84,7 @@ tm::task_stats make_stats(tasks::task_stats stats) {
res.table = stats.table;
res.entity = stats.entity;
res.shard = stats.shard;
res.creation_time = get_time(stats.creation_time);
res.start_time = get_time(stats.start_time);
res.end_time = get_time(stats.end_time);;
return res;

View File

@@ -146,8 +146,7 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();

View File

@@ -165,8 +165,7 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
service::topology_mutation_builder builder(ts);
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
rtbuilder.set("done", false)
.set("start_time", db_clock::now());
rtbuilder.set("done", false);
if (!qp.proxy().features().topology_global_request_queue) {
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_global_topology_request_id(global_request_id);

View File

@@ -65,7 +65,7 @@ future<> snapshot_ctl::run_snapshot_modify_operation(noncopyable_function<future
});
}
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
if (tag.empty()) {
throw std::runtime_error("You must supply a snapshot name.");
}
@@ -74,21 +74,21 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(keyspace_names));
};
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), opts, this] () mutable {
return do_take_snapshot(std::move(tag), std::move(keyspace_names), opts);
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] () mutable {
return do_take_snapshot(std::move(tag), std::move(keyspace_names), sf);
});
}
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
co_await coroutine::parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) {
return check_snapshot_not_exist(ks_name, tag);
});
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), opts] (const auto& ks_name) {
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, opts);
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, bool(sf));
});
}
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
if (ks_name.empty()) {
throw std::runtime_error("You must supply a keyspace name");
}
@@ -99,14 +99,14 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
throw std::runtime_error("You must supply a snapshot name.");
}
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), opts);
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] () mutable {
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), sf);
});
}
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
co_await check_snapshot_not_exist(ks_name, tag, tables);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), bool(sf));
}
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {

View File

@@ -38,13 +38,10 @@ class backup_task_impl;
} // snapshot namespace
struct snapshot_options {
bool skip_flush = false;
bool use_sstable_identifier = false;
};
class snapshot_ctl : public peering_sharded_service<snapshot_ctl> {
public:
using skip_flush = bool_class<class skip_flush_tag>;
struct table_snapshot_details {
int64_t total;
int64_t live;
@@ -73,8 +70,8 @@ public:
*
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_snapshot(sstring tag, snapshot_options opts = {}) {
return take_snapshot(tag, {}, opts);
future<> take_snapshot(sstring tag, skip_flush sf = skip_flush::no) {
return take_snapshot(tag, {}, sf);
}
/**
@@ -83,7 +80,7 @@ public:
* @param tag the tag given to the snapshot; may not be null or empty
* @param keyspace_names the names of the keyspaces to snapshot; empty means "all"
*/
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {});
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
/**
* Takes the snapshot of multiple tables. A snapshot name must be specified.
@@ -92,7 +89,7 @@ public:
* @param tables a vector of tables names to snapshot
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
/**
* Remove the snapshot with the given name from the given keyspaces.
@@ -130,8 +127,8 @@ private:
friend class snapshot::backup_task_impl;
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
};
}

View File

@@ -137,8 +137,6 @@ namespace {
system_keyspace::ROLE_PERMISSIONS,
system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS,
// repair tasks
system_keyspace::REPAIR_TASKS,
};
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
props.is_group0_table = true;
@@ -464,24 +462,6 @@ schema_ptr system_keyspace::repair_history() {
return schema;
}
schema_ptr system_keyspace::repair_tasks() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, REPAIR_TASKS);
return schema_builder(NAME, REPAIR_TASKS, std::optional(id))
.with_column("task_uuid", uuid_type, column_kind::partition_key)
.with_column("operation", utf8_type, column_kind::clustering_key)
// First and last token for of the tablet
.with_column("first_token", long_type, column_kind::clustering_key)
.with_column("last_token", long_type, column_kind::clustering_key)
.with_column("timestamp", timestamp_type)
.with_column("table_uuid", uuid_type, column_kind::static_column)
.set_comment("Record tablet repair tasks")
.with_hash_version()
.build();
}();
return schema;
}
schema_ptr system_keyspace::built_indexes() {
static thread_local auto built_indexes = [] {
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
@@ -2331,7 +2311,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
corrupt_data(),
scylla_local(), db::schema_tables::scylla_table_schema_history(),
repair_history(),
repair_tasks(),
v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),
@@ -2573,32 +2552,6 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
});
}
future<utils::chunked_vector<canonical_mutation>> system_keyspace::get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts) {
// Default to timeout the repair task entries in 10 days, this should be enough time for the management tools to query
constexpr int ttl = 10 * 24 * 3600;
sstring req = format("INSERT INTO system.{} (task_uuid, operation, first_token, last_token, timestamp, table_uuid) VALUES (?, ?, ?, ?, ?, ?) USING TTL {}", REPAIR_TASKS, ttl);
auto muts = co_await _qp.get_mutations_internal(req, internal_system_query_state(), ts,
{entry.task_uuid.uuid(), repair_task_operation_to_string(entry.operation),
entry.first_token, entry.last_token, entry.timestamp, entry.table_uuid.uuid()});
utils::chunked_vector<canonical_mutation> cmuts = {muts.begin(), muts.end()};
co_return cmuts;
}
future<> system_keyspace::get_repair_task(tasks::task_id task_uuid, repair_task_consumer f) {
sstring req = format("SELECT * from system.{} WHERE task_uuid = {}", REPAIR_TASKS, task_uuid);
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
repair_task_entry ent;
ent.task_uuid = tasks::task_id(row.get_as<utils::UUID>("task_uuid"));
ent.operation = repair_task_operation_from_string(row.get_as<sstring>("operation"));
ent.first_token = row.get_as<int64_t>("first_token");
ent.last_token = row.get_as<int64_t>("last_token");
ent.timestamp = row.get_as<db_clock::time_point>("timestamp");
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
co_await f(std::move(ent));
co_return stop_iteration::no;
});
}
future<gms::generation_type> system_keyspace::increment_and_get_generation() {
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
@@ -3770,35 +3723,4 @@ future<> system_keyspace::apply_mutation(mutation m) {
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
}
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<system_keyspace::repair_task_operation, sstring> repair_task_operation_to_name = {
{system_keyspace::repair_task_operation::requested, "requested"},
{system_keyspace::repair_task_operation::finished, "finished"},
};
static const std::unordered_map<sstring, system_keyspace::repair_task_operation> repair_task_operation_from_name = std::invoke([] {
std::unordered_map<sstring, system_keyspace::repair_task_operation> result;
for (auto&& [v, s] : repair_task_operation_to_name) {
result.emplace(s, v);
}
return result;
});
sstring system_keyspace::repair_task_operation_to_string(system_keyspace::repair_task_operation op) {
auto i = repair_task_operation_to_name.find(op);
if (i == repair_task_operation_to_name.end()) {
on_internal_error(slogger, format("Invalid repair task operation: {}", static_cast<int>(op)));
}
return i->second;
}
system_keyspace::repair_task_operation system_keyspace::repair_task_operation_from_string(const sstring& name) {
return repair_task_operation_from_name.at(name);
}
} // namespace db
auto fmt::formatter<db::system_keyspace::repair_task_operation>::format(const db::system_keyspace::repair_task_operation& op, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", db::system_keyspace::repair_task_operation_to_string(op));
}

View File

@@ -57,8 +57,6 @@ namespace paxos {
struct topology_request_state;
class group0_guard;
class raft_group0_client;
}
namespace netw {
@@ -186,7 +184,6 @@ public:
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
static constexpr auto REPAIR_HISTORY = "repair_history";
static constexpr auto REPAIR_TASKS = "repair_tasks";
static constexpr auto GROUP0_HISTORY = "group0_history";
static constexpr auto DISCOVERY = "discovery";
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
@@ -263,7 +260,6 @@ public:
static schema_ptr raft();
static schema_ptr raft_snapshots();
static schema_ptr repair_history();
static schema_ptr repair_tasks();
static schema_ptr group0_history();
static schema_ptr discovery();
static schema_ptr broadcast_kv_store();
@@ -402,22 +398,6 @@ public:
int64_t range_end;
};
enum class repair_task_operation {
requested,
finished,
};
static sstring repair_task_operation_to_string(repair_task_operation op);
static repair_task_operation repair_task_operation_from_string(const sstring& name);
struct repair_task_entry {
tasks::task_id task_uuid;
repair_task_operation operation;
int64_t first_token;
int64_t last_token;
db_clock::time_point timestamp;
table_id table_uuid;
};
struct topology_requests_entry {
utils::UUID id;
utils::UUID initiating_host;
@@ -439,10 +419,6 @@ public:
using repair_history_consumer = noncopyable_function<future<>(const repair_history_entry&)>;
future<> get_repair_history(table_id, repair_history_consumer f);
future<utils::chunked_vector<canonical_mutation>> get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts);
using repair_task_consumer = noncopyable_function<future<>(const repair_task_entry&)>;
future<> get_repair_task(tasks::task_id task_uuid, repair_task_consumer f);
future<> save_truncation_record(const replica::column_family&, db_clock::time_point truncated_at, db::replay_position);
future<replay_positions> get_truncated_positions(table_id);
future<> drop_truncation_rp_records();
@@ -750,8 +726,3 @@ public:
}; // class system_keyspace
} // namespace db
template <>
struct fmt::formatter<db::system_keyspace::repair_task_operation> : fmt::formatter<string_view> {
auto format(const db::system_keyspace::repair_task_operation&, fmt::format_context& ctx) const -> decltype(ctx.out());
};

View File

@@ -1744,115 +1744,6 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
&& std::ranges::contains(shards, this_shard_id());
}
static endpoints_to_update get_view_natural_endpoint_vnodes(
locator::host_id me,
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
locator::endpoint_dc_rack my_location,
const locator::network_topology_strategy* network_topology,
replica::cf_stats& cf_stats) {
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector base_endpoints, view_endpoints;
auto& my_datacenter = my_location.dc;
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (!network_topology || node.get().dc() == my_datacenter) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
}
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (!network_topology || view_node.get().dc() == my_datacenter) {
view_endpoints.push_back(view_node);
}
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
return {.natural_endpoint = view_endpoints[idx].get().host_id()};
}
static std::optional<locator::host_id> get_unpaired_view_endpoint(
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
replica::cf_stats& cf_stats) {
std::unordered_set<locator::endpoint_dc_rack> base_dc_racks;
for (auto&& base_node : base_nodes) {
if (base_dc_racks.contains(base_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple base table replicas in the same dc/rack({}/{}):",
base_node.get().dc(), base_node.get().rack());
return std::nullopt;
}
base_dc_racks.insert(base_node.get().dc_rack());
}
std::unordered_set<locator::endpoint_dc_rack> paired_view_dc_racks;
std::unordered_map<locator::endpoint_dc_rack, locator::host_id> unpaired_view_dc_rack_replicas;
for (auto&& view_node : view_nodes) {
if (paired_view_dc_racks.contains(view_node.get().dc_rack()) || unpaired_view_dc_rack_replicas.contains(view_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple view table replicas in the same dc/rack({}/{}):",
view_node.get().dc(), view_node.get().rack());
return std::nullopt;
}
// Track unpaired replicas in both sets
if (base_dc_racks.contains(view_node.get().dc_rack())) {
paired_view_dc_racks.insert(view_node.get().dc_rack());
} else {
unpaired_view_dc_rack_replicas.insert({view_node.get().dc_rack(), view_node.get().host_id()});
}
}
if (unpaired_view_dc_rack_replicas.size() > 0) {
// There are view replicas that can't be paired with any base replica
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
auto extra_replica = unpaired_view_dc_rack_replicas.begin()->second;
unpaired_view_dc_rack_replicas.erase(unpaired_view_dc_rack_replicas.begin());
if (unpaired_view_dc_rack_replicas.size() > 0) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
unpaired_view_dc_rack_replicas | std::views::values);
}
return extra_replica;
}
return std::nullopt;
}
// Calculate the node ("natural endpoint") to which this node should send
// a view update.
//
@@ -1865,19 +1756,29 @@ static std::optional<locator::host_id> get_unpaired_view_endpoint(
// of this function is to find, assuming that this node is one of the base
// replicas for a given partition, the paired view replica.
//
// When using vnodes, we have an optimization called "self-pairing" - if a single
// node is both a base replica and a view replica for a write, the pairing is
// modified so that this node sends the update to itself and this node is removed
// from the lists of nodes paired by index. This self-pairing optimization can
// cause the pairing to change after view ranges are moved between nodes.
// In the past, we used an optimization called "self-pairing" that if a single
// node was both a base replica and a view replica for a write, the pairing is
// modified so that this node would send the update to itself. This self-
// pairing optimization could cause the pairing to change after view ranges
// are moved between nodes, so currently we only use it if
// use_legacy_self_pairing is set to true. When using tablets - where range
// movements are common - it is strongly recommended to set it to false.
//
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
// we pair only nodes in the same datacenter.
//
// If the table uses tablets, then pairing is rack-aware. In this case, in each
// rack where we have a base replica there is also one replica of each view tablet.
// Therefore, the base replicas are naturally paired with the view replicas that
// are in the same rack.
// When use_legacy_self_pairing is enabled, if one of the base replicas
// also happens to be a view replica, it is paired with itself
// (with the other nodes paired by order in the list
// after taking this node out).
//
// If the table uses tablets and the replication strategy is NetworkTopologyStrategy
// and the replication factor in the node's datacenter is a multiple of the number
// of racks in the datacenter, then pairing is rack-aware. In this case,
// all racks have the same number of replicas, and those are never migrated
// outside their racks. Therefore, the base replicas are naturally paired with the
// view replicas that are in the same rack, based on the ordinal position.
// Note that typically, there is a single replica per rack and pairing is trivial.
//
// If the assumption that the given base token belongs to this replica
// does not hold, we return an empty optional.
@@ -1905,12 +1806,19 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_tablets,
bool use_legacy_self_pairing,
bool use_tablets_rack_aware_view_pairing,
replica::cf_stats& cf_stats) {
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
auto& my_location = topology.get_location(me);
auto& my_datacenter = my_location.dc;
auto* network_topology = dynamic_cast<const locator::network_topology_strategy*>(&replication_strategy);
auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology;
bool simple_rack_aware_pairing = false;
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector orig_base_endpoints, orig_view_endpoints;
node_vector base_endpoints, view_endpoints;
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
if (auto* np = topology.find_node(ep)) {
@@ -1921,7 +1829,6 @@ endpoints_to_update get_view_natural_endpoint(
// We need to use get_replicas() for pairing to be stable in case base or view tablet
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
auto base_nodes = base_erm->get_replicas(base_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
return resolve(topology, ep, false);
}) | std::ranges::to<node_vector>();
@@ -1945,43 +1852,231 @@ endpoints_to_update get_view_natural_endpoint(
// note that the recursive call will not recurse again because leaving_base is in base_nodes.
auto leaving_base = it->get().host_id();
return get_view_natural_endpoint(leaving_base, base_erm, view_erm, replication_strategy, base_token,
view_token, use_tablets, cf_stats);
view_token, use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
}
}
}
if (!use_tablets) {
return get_view_natural_endpoint_vnodes(
me,
base_nodes,
view_nodes,
my_location,
network_topology,
cf_stats);
std::function<bool(const locator::node&)> is_candidate;
if (network_topology) {
is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; };
} else {
is_candidate = [&] (const locator::node&) { return true; };
}
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (is_candidate(node)) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
}
std::optional<locator::host_id> paired_replica;
for (auto&& view_node : view_nodes) {
if (view_node.get().dc_rack() == my_location) {
paired_replica = view_node.get().host_id();
break;
if (use_legacy_self_pairing) {
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (is_candidate(view_node)) {
view_endpoints.push_back(view_node);
}
}
} else {
for (auto&& view_node : view_nodes) {
process_candidate(view_endpoints, view_node);
}
}
if (paired_replica && base_nodes.size() == view_nodes.size()) {
// We don't need to find any extra replicas, so we can return early
return {.natural_endpoint = paired_replica};
// Try optimizing for simple rack-aware pairing
// If the numbers of base and view replica differ, that means an RF change is taking place
// and we can't use simple rack-aware pairing.
if (rack_aware_pairing && base_endpoints.size() == view_endpoints.size()) {
auto dc_rf = network_topology->get_replication_factor(my_datacenter);
const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter);
// Simple rack-aware pairing is possible when the datacenter replication factor
// is a multiple of the number of racks in the datacenter.
if (dc_rf % racks.size() == 0) {
simple_rack_aware_pairing = true;
size_t rack_rf = dc_rf / racks.size();
// If any rack doesn't have enough nodes to satisfy the per-rack rf
// simple rack-aware pairing is disabled.
for (const auto& [rack, nodes] : racks) {
if (nodes.size() < rack_rf) {
simple_rack_aware_pairing = false;
break;
}
}
}
if (dc_rf != base_endpoints.size()) {
// If the datacenter replication factor is not equal to the number of base replicas,
// we're in progress of a RF change and we can't use simple rack-aware pairing.
simple_rack_aware_pairing = false;
}
if (simple_rack_aware_pairing) {
std::erase_if(base_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
std::erase_if(view_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
}
}
if (!paired_replica) {
// We couldn't find any view replica in our rack
orig_base_endpoints = base_endpoints;
orig_view_endpoints = view_endpoints;
// For the complex rack_aware_pairing case, nodes are already filtered by datacenter
// Use best-match, for the minimum number of base and view replicas in each rack,
// and ordinal match for the rest.
std::optional<std::reference_wrapper<const locator::node>> paired_replica;
if (rack_aware_pairing && !simple_rack_aware_pairing) {
struct indexed_replica {
size_t idx;
std::reference_wrapper<const locator::node> node;
};
std::unordered_map<sstring, std::vector<indexed_replica>> base_racks, view_racks;
// First, index all replicas by rack
auto index_replica_set = [] (std::unordered_map<sstring, std::vector<indexed_replica>>& racks, const node_vector& replicas) {
size_t idx = 0;
for (const auto& r: replicas) {
racks[r.get().rack()].emplace_back(idx++, r);
}
};
index_replica_set(base_racks, base_endpoints);
index_replica_set(view_racks, view_endpoints);
// Try optimistically pairing `me` first
const auto& my_base_replicas = base_racks[my_location.rack];
auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); });
if (base_it == my_base_replicas.end()) {
return {};
}
const auto& my_view_replicas = view_racks[my_location.rack];
size_t idx = base_it - my_base_replicas.begin();
if (idx < my_view_replicas.size()) {
if (orig_view_endpoints.size() <= orig_base_endpoints.size()) {
return {.natural_endpoint = my_view_replicas[idx].node.get().host_id()};
} else {
// If the number of view replicas is larger than the number of base replicas,
// we need to find the unpaired view replica, so we can't return yet.
paired_replica = my_view_replicas[idx].node;
}
}
// Collect all unpaired base and view replicas,
// where the number of replicas in the base rack is different than the respective view rack
std::vector<indexed_replica> unpaired_base_replicas, unpaired_view_replicas;
for (const auto& [rack, base_replicas] : base_racks) {
const auto& view_replicas = view_racks[rack];
for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) {
unpaired_base_replicas.emplace_back(base_replicas[i]);
}
}
for (const auto& [rack, view_replicas] : view_racks) {
const auto& base_replicas = base_racks[rack];
for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) {
unpaired_view_replicas.emplace_back(view_replicas[i]);
}
}
// Sort by the original ordinality, and copy the sorted results
// back into {base,view}_endpoints, for backward compatible processing below.
std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
base_endpoints.clear();
std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node));
std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
view_endpoints.clear();
std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node));
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (!paired_replica && base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
std::optional<std::reference_wrapper<const locator::node>> no_pairing_replica;
if (!paired_replica && idx >= view_endpoints.size()) {
// There are fewer view replicas than base replicas
// FIXME: This might still happen when reducing replication factor with tablets,
// see https://github.com/scylladb/scylladb/issues/21492
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Could not find a view replica in the same rack as base replica {} for base_endpoints={} view_endpoints={}",
me,
base_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)),
view_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)));
vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me,
rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none",
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)),
orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
} else if (base_endpoints.size() < view_endpoints.size()) {
// There are fewer base replicas than view replicas.
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_endpoints.back();
if (base_endpoints.size() < view_endpoints.size() - 1) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
std::span(view_endpoints.begin() + base_endpoints.size(), view_endpoints.end() - 1) | std::views::transform(std::mem_fn(&locator::node::host_id)));
}
}
std::optional<locator::host_id> no_pairing_replica = get_unpaired_view_endpoint(base_nodes, view_nodes, cf_stats);
return {.natural_endpoint = paired_replica,
.endpoint_with_no_pairing = no_pairing_replica};
if (!paired_replica) {
paired_replica = view_endpoints[idx];
}
if (!no_pairing_replica && base_nodes.size() < view_nodes.size()) {
// This can happen when the view replica with no pairing is in another DC.
// We need to send an update to it if there are no base replicas in that DC yet,
// as it won't receive updates otherwise.
std::unordered_set<sstring> dcs_with_base_replicas;
for (const auto& base_node : base_nodes) {
dcs_with_base_replicas.insert(base_node.get().dc());
}
for (const auto& view_node : view_nodes) {
if (!dcs_with_base_replicas.contains(view_node.get().dc())) {
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_node;
break;
}
}
}
// https://github.com/scylladb/scylladb/issues/19439
// With tablets, a node being replaced might transition to "left" state
// but still be kept as a replica.
// As of writing this hints are not prepared to handle nodes that are left
// but are still replicas. Therefore, there is no other sensible option
// right now but to give up attempt to send the update or write a hint
// to the paired, permanently down replica.
// We use the same workaround for the extra replica.
auto return_host_id_if_not_left = [] (const auto& replica) -> std::optional<locator::host_id> {
if (!replica) {
return std::nullopt;
}
const auto& node = replica->get();
if (!node.left()) {
return node.host_id();
} else {
return std::nullopt;
}
};
return {.natural_endpoint = return_host_id_if_not_left(paired_replica),
.endpoint_with_no_pairing = return_host_id_if_not_left(no_pairing_replica)};
}
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
@@ -2041,6 +2136,12 @@ future<> view_update_generator::mutate_MV(
{
auto& ks = _db.find_keyspace(base->ks_name());
auto& replication = ks.get_replication_strategy();
// We set legacy self-pairing for old vnode-based tables (for backward
// compatibility), and unset it for tablets - where range movements
// are more frequent and backward compatibility is less important.
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
// on a view, like we have the synchronous_updates_flag.
bool use_legacy_self_pairing = !ks.uses_tablets();
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
auto get_erm = [&] (table_id id) {
auto it = erms.find(id);
@@ -2053,6 +2154,10 @@ future<> view_update_generator::mutate_MV(
for (const auto& mut : view_updates) {
(void)get_erm(mut.s->id());
}
// Enable rack-aware view updates pairing for tablets
// when the cluster feature is enabled so that all replicas agree
// on the pairing algorithm.
bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets();
auto me = base_ermp->get_topology().my_host_id();
static constexpr size_t max_concurrent_updates = 128;
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
@@ -2060,7 +2165,7 @@ future<> view_update_generator::mutate_MV(
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto view_ermp = erms.at(mut.s->id());
auto [target_endpoint, no_pairing_endpoint] = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token,
ks.uses_tablets(), cf_stats);
use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
auto memory_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_update_memory_units.split(memory_usage_of(mut)));
if (no_pairing_endpoint) {

View File

@@ -305,7 +305,8 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_tablets,
bool use_legacy_self_pairing,
bool use_tablets_basic_rack_aware_view_pairing,
replica::cf_stats& cf_stats);
/// Verify that the provided keyspace is eligible for storing materialized views.

View File

@@ -1 +1 @@
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --collector.ethtool.metrics-include='(bw_in_allowance_exceeded|bw_out_allowance_exceeded|conntrack_allowance_exceeded|conntrack_allowance_available|linklocal_allowance_exceeded)' --collector.ethtool --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"

View File

@@ -45,6 +45,22 @@ immediately after it's finished.
A flag which determines if a task can be aborted through API.
# Task timing fields
Tasks have three timing fields that track different stages of their lifecycle:
- `creation_time` - When the task was created/queued. This is extracted from the task's
UUID (which is a timeuuid) and represents the moment the task request was submitted.
- `start_time` - When the task actually began executing. For tasks that are queued, this
will be unspecified (equal to epoch) until execution starts. For node operations
like decommission, this is set when the request is picked up for execution by the
topology coordinator.
- `end_time` - When the task completed (successfully or with an error). This is
unspecified (equal to epoch) until the task finishes.
The difference between `creation_time` and `start_time` represents the time a task
spent waiting in the queue before execution began.
# Type vs scope vs kind
`type` of a task describes what operation is covered by a task,

View File

@@ -17,7 +17,7 @@ SYNOPSIS
[(-u <username> | --username <username>)] snapshot
[(-cf <table> | --column-family <table> | --table <table>)]
[(-kc <kclist> | --kc.list <kclist>)]
[(-sf | --skip-flush)] [--use-sstable-identifier] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
[(-sf | --skip-flush)] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
OPTIONS
.......
@@ -37,8 +37,6 @@ Parameter Descriptio
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
-sf / --skip-flush Do not flush memtables before snapshotting (snapshot will not contain unflushed data)
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
--use-sstable-identifier Use the sstable identifier UUID, if available, rather than the sstable generation.
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
-t <tag> / --tag <tag> The name of the snapshot
==================================================================== =====================================================================================

View File

@@ -42,21 +42,21 @@ For single list:
.. code-block:: shell
task_id type kind scope state sequence_number keyspace table entity shard start_time end_time
5116ddb6-85b5-4c3e-94fb-72128f15d7b4 repair node keyspace done 3 abc 0 2025-01-16T16:12:11Z 2025-01-16T16:12:13Z
task_id type kind scope state sequence_number keyspace table entity shard creation_time start_time end_time
5116ddb6-85b5-4c3e-94fb-72128f15d7b4 repair node keyspace done 3 abc 0 2025-01-16T16:12:08Z 2025-01-16T16:12:11Z 2025-01-16T16:12:13Z
With repetition:
.. code-block:: shell
task_id type kind scope state sequence_number keyspace table entity shard start_time end_time
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:57Z
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
task_id type kind scope state sequence_number keyspace table entity shard creation_time start_time end_time
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:54Z 2025-01-16T16:12:57Z
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:42Z 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
task_id type kind scope state sequence_number keyspace table entity shard start_time end_time
1e535f9b-97fa-4788-a956-8f3216a6ea8d repair node keyspace created 6 abc 0
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:57Z
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
task_id type kind scope state sequence_number keyspace table entity shard creation_time start_time end_time
1e535f9b-97fa-4788-a956-8f3216a6ea8d repair node keyspace created 6 abc 0 2025-01-16T16:13:02Z
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:54Z 2025-01-16T16:12:57Z
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:42Z 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
See also
--------

View File

@@ -25,6 +25,7 @@ Example output
scope: keyspace
state: running
is_abortable: true
creation_time: 2024-07-29T15:48:50Z
start_time: 2024-07-29T15:48:55Z
end_time:
error:

View File

@@ -26,22 +26,22 @@ For single task:
.. code-block:: shell
id type kind scope state is_abortable start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
be5559ea-bc5a-428c-b8ce-d14eac7a1765 repair node keyspace done true 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z none 1 0 abc ranges 4 4 [{task_id: 542e38cb-9ad4-40aa-9010-de2630004e55, node: 127.0.0.1 }, {task_id: 8974ebcc-1e87-4040-88fe-f2438261f7fb, node: 127.0.0.1 }]
542e38cb-9ad4-40aa-9010-de2630004e55 repair node shard done false 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 0 abc ranges 2 2 []
8974ebcc-1e87-4040-88fe-f2438261f7fb repair node shard done false 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 1 abc ranges 2 2 []
id type kind scope state is_abortable creation_time start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
be5559ea-bc5a-428c-b8ce-d14eac7a1765 repair node keyspace done true 2024-07-29T16:06:43Z 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z none 1 0 abc ranges 4 4 [{task_id: 542e38cb-9ad4-40aa-9010-de2630004e55, node: 127.0.0.1 }, {task_id: 8974ebcc-1e87-4040-88fe-f2438261f7fb, node: 127.0.0.1 }]
542e38cb-9ad4-40aa-9010-de2630004e55 repair node shard done false 2024-07-29T16:06:43Z 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 0 abc ranges 2 2 []
8974ebcc-1e87-4040-88fe-f2438261f7fb repair node shard done false 2024-07-29T16:06:43Z 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 1 abc ranges 2 2 []
For all tasks:
.. code-block:: shell
id type kind scope state is_abortable start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc repair node keyspace done true 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z none 1 0 abc ranges 4 4 [{task_id: e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee, node: 127.0.0.1 }, {task_id: 49eb5797-b67e-46b0-9365-4460f7cf988a, node: 127.0.0.1 }]
e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee repair node shard done false 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 0 abc ranges 2 2 []
49eb5797-b67e-46b0-9365-4460f7cf988a repair node shard done false 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 1 abc ranges 2 2 []
82d7b2a4-146e-4a72-ba93-c66d5b4e9867 offstrategy compaction node keyspace done true 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z none 954 0 abc 1 1 [{task_id: 9818277b-238d-4298-a56b-c0d2153bf140, node: 127.0.0.1 }, {task_id: c1eb0701-ad7a-45ff-956f-7b8d671fc5db, node: 127.0.0.1 }
9818277b-238d-4298-a56b-c0d2153bf140 offstrategy compaction node shard done false 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 0 abc 1 1 []
c1eb0701-ad7a-45ff-956f-7b8d671fc5db offstrategy compaction node shard done false 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 1 abc 1 1 []
id type kind scope state is_abortable creation_time start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc repair node keyspace done true 2024-07-29T16:34:43Z 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z none 1 0 abc ranges 4 4 [{task_id: e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee, node: 127.0.0.1 }, {task_id: 49eb5797-b67e-46b0-9365-4460f7cf988a, node: 127.0.0.1 }]
e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee repair node shard done false 2024-07-29T16:34:43Z 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 0 abc ranges 2 2 []
49eb5797-b67e-46b0-9365-4460f7cf988a repair node shard done false 2024-07-29T16:34:43Z 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 1 abc ranges 2 2 []
82d7b2a4-146e-4a72-ba93-c66d5b4e9867 offstrategy compaction node keyspace done true 2024-07-29T16:34:13Z 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z none 954 0 abc 1 1 [{task_id: 9818277b-238d-4298-a56b-c0d2153bf140, node: 127.0.0.1 }, {task_id: c1eb0701-ad7a-45ff-956f-7b8d671fc5db, node: 127.0.0.1 }
9818277b-238d-4298-a56b-c0d2153bf140 offstrategy compaction node shard done false 2024-07-29T16:34:13Z 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 0 abc 1 1 []
c1eb0701-ad7a-45ff-956f-7b8d671fc5db offstrategy compaction node shard done false 2024-07-29T16:34:13Z 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 1 abc 1 1 []
See also
--------

8
docs/poetry.lock generated
View File

@@ -1018,14 +1018,14 @@ sphinx-markdown-tables = "0.0.17"
[[package]]
name = "sphinx-scylladb-theme"
version = "1.8.10"
version = "1.8.9"
description = "A Sphinx Theme for ScyllaDB documentation projects"
optional = false
python-versions = "<4.0,>=3.10"
groups = ["main"]
files = [
{file = "sphinx_scylladb_theme-1.8.10-py3-none-any.whl", hash = "sha256:8b930f33bec7308ccaa92698ebb5ad85059bcbf93a463f92917aeaf473fce632"},
{file = "sphinx_scylladb_theme-1.8.10.tar.gz", hash = "sha256:8a78a9b692d9a946be2c4a64aa472fd82204cc8ea0b1ee7f60de6db35b356326"},
{file = "sphinx_scylladb_theme-1.8.9-py3-none-any.whl", hash = "sha256:f8649a7753a29494fd2b417d1cb855035dddb9ebd498ea033fd73f5f9338271e"},
{file = "sphinx_scylladb_theme-1.8.9.tar.gz", hash = "sha256:ab7cda4c10a0d067c5c3a45f7b1f68cb8ebefe135a0be0738bfa282a344769b6"},
]
[package.dependencies]
@@ -1603,4 +1603,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = "^3.10"
content-hash = "0ae673106f45d3465cbdabbf511e165ca44feadd34d7753f2e68093afaa95c79"
content-hash = "74912627a3f424290ed7889451c0bdb1a862ab85b1d07c85f4f3b8c34f32a020"

View File

@@ -9,7 +9,7 @@ package-mode = false
python = "^3.10"
pygments = "^2.18.0"
redirects_cli ="^0.1.3"
sphinx-scylladb-theme = "^1.8.10"
sphinx-scylladb-theme = "^1.8.9"
sphinx-sitemap = "^2.6.0"
sphinx-autobuild = "^2024.4.19"
Sphinx = "^7.3.7"

View File

@@ -143,7 +143,6 @@ public:
gms::feature tablet_incremental_repair { *this, "TABLET_INCREMENTAL_REPAIR"sv };
gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv };
gms::feature tablet_repair_tasks_table { *this, "TABLET_REPAIR_TASKS_TABLE"sv };
gms::feature tablet_merge { *this, "TABLET_MERGE"sv };
gms::feature tablet_rack_aware_view_pairing { *this, "TABLET_RACK_AWARE_VIEW_PAIRING"sv };

View File

@@ -38,7 +38,6 @@ debian_base_packages=(
python3-aiohttp
python3-pyparsing
python3-colorama
python3-dev
python3-tabulate
python3-pytest
python3-pytest-asyncio
@@ -66,7 +65,6 @@ debian_base_packages=(
git-lfs
e2fsprogs
fuse3
libev-dev # for python driver
)
fedora_packages=(
@@ -92,7 +90,6 @@ fedora_packages=(
patchelf
python3
python3-aiohttp
python3-devel
python3-pip
python3-file-magic
python3-colorama
@@ -157,8 +154,6 @@ fedora_packages=(
https://github.com/scylladb/cassandra-stress/releases/download/v3.18.1/cassandra-stress-java21-3.18.1-1.noarch.rpm
elfutils
jq
libev-devel # for python driver
)
fedora_python3_packages=(

View File

@@ -14,6 +14,7 @@
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/error_injection.hh"
#include "utils/UUID_gen.hh"
#include <variant>
#include "utils/overloaded_functor.hh"
@@ -90,6 +91,7 @@ future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status_help
.scope = "cluster",
.state = get_state(entry),
.is_abortable = co_await is_abortable(std::move(hint)),
.creation_time = db_clock::time_point(utils::UUID_gen::unix_timestamp(id.uuid())),
.start_time = entry.start_time,
.end_time = entry.end_time,
.error = entry.error,
@@ -167,6 +169,7 @@ future<std::vector<tasks::task_stats>> node_ops_virtual_task::get_stats() {
.table = "",
.entity = "",
.shard = 0,
.creation_time = db_clock::time_point(utils::UUID_gen::unix_timestamp(id)),
.start_time = entry.start_time,
.end_time = entry.end_time
};

View File

@@ -3844,83 +3844,3 @@ future<uint32_t> repair_service::get_next_repair_meta_id() {
locator::host_id repair_service::my_host_id() const noexcept {
return _gossiper.local().my_host_id();
}
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2) {
if (ranges1.empty() || ranges2.empty()) {
co_return 0;
}
auto sort = [] (utils::chunked_vector<tablet_token_range>& ranges) {
std::sort(ranges.begin(), ranges.end(), [] (const auto& a, const auto& b) {
if (a.first_token != b.first_token) {
return a.first_token < b.first_token;
}
return a.last_token < b.last_token;
});
};
// First, merge overlapping and adjacent ranges in ranges2.
sort(ranges2);
utils::chunked_vector<tablet_token_range> merged;
merged.push_back(ranges2[0]);
for (size_t i = 1; i < ranges2.size(); ++i) {
co_await coroutine::maybe_yield();
// To avoid overflow with max() + 1, we check adjacency with `a - 1 <= b` instead of `a <= b + 1`
if (ranges2[i].first_token - 1 <= merged.back().last_token) {
merged.back().last_token = std::max(merged.back().last_token, ranges2[i].last_token);
} else {
merged.push_back(ranges2[i]);
}
}
// Count covered ranges using a linear scan
size_t covered_count = 0;
auto it = merged.begin();
auto end = merged.end();
sort(ranges1);
for (const auto& r1 : ranges1) {
co_await coroutine::maybe_yield();
// Advance the merged iterator only if the current merged range ends
// before the current r1 starts.
while (it != end && it->last_token < r1.first_token) {
co_await coroutine::maybe_yield();
++it;
}
// If we have exhausted the merged ranges, no further r1 can be covered
if (it == end) {
break;
}
// Check if the current merged range covers r1.
if (it->first_token <= r1.first_token && r1.last_token <= it->last_token) {
covered_count++;
}
}
co_return covered_count;
}
future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_task_progress(tasks::task_id task_uuid) {
utils::chunked_vector<tablet_token_range> requested_tablets;
utils::chunked_vector<tablet_token_range> finished_tablets;
table_id tid;
if (!_db.local().features().tablet_repair_tasks_table) {
co_return std::nullopt;
}
co_await _sys_ks.local().get_repair_task(task_uuid, [&tid, &requested_tablets, &finished_tablets] (const db::system_keyspace::repair_task_entry& entry) -> future<> {
rlogger.debug("repair_task_progress: Get entry operation={} first_token={} last_token={}", entry.operation, entry.first_token, entry.last_token);
if (entry.operation == db::system_keyspace::repair_task_operation::requested) {
requested_tablets.push_back({entry.first_token, entry.last_token});
} else if (entry.operation == db::system_keyspace::repair_task_operation::finished) {
finished_tablets.push_back({entry.first_token, entry.last_token});
}
tid = entry.table_uuid;
co_return;
});
auto requested = requested_tablets.size();
auto finished_nomerge = finished_tablets.size();
auto finished = co_await count_finished_tablets(std::move(requested_tablets), std::move(finished_tablets));
auto progress = repair_task_progress{requested, finished, tid};
rlogger.debug("repair_task_progress: task_uuid={} table_uuid={} requested_tablets={} finished_tablets={} progress={} finished_nomerge={}",
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
co_return progress;
}

View File

@@ -99,15 +99,6 @@ public:
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
struct repair_task_progress {
size_t requested;
size_t finished;
table_id table_uuid;
float progress() const {
return requested == 0 ? 1.0 : float(finished) / requested;
}
};
class repair_service : public seastar::peering_sharded_service<repair_service> {
sharded<service::topology_state_machine>& _tsm;
sharded<gms::gossiper>& _gossiper;
@@ -231,9 +222,6 @@ private:
public:
future<gc_clock::time_point> repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid, tasks::task_info global_tablet_repair_task_info, service::frozen_topology_guard topo_guard, std::optional<locator::tablet_replica_set> rebuild_replicas, locator::tablet_transition_stage stage);
future<std::optional<repair_task_progress>> get_tablet_repair_task_progress(tasks::task_id task_uuid);
private:
future<repair_update_system_table_response> repair_update_system_table_handler(
@@ -338,12 +326,3 @@ future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,
schema_ptr s, uint64_t seed, repair_master is_master,
reader_permit permit, repair_hasher hasher);
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, std::optional<small_table_optimization_params> small_table_optimization = std::nullopt, repair_meta* rm = nullptr);
// A struct to hold the first and last token of a tablet.
struct tablet_token_range {
int64_t first_token;
int64_t last_token;
};
// Function to count the number of ranges in ranges1 covered by the merged ranges of ranges2.
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2);

View File

@@ -297,17 +297,17 @@ public:
const dht::token_range& token_range() const noexcept;
size_t memtable_count() const;
size_t memtable_count() const noexcept;
const compaction_group_ptr& main_compaction_group() const noexcept;
const std::vector<compaction_group_ptr>& split_ready_compaction_groups() const;
compaction_group_ptr& select_compaction_group(locator::tablet_range_side) noexcept;
uint64_t live_disk_space_used() const;
uint64_t live_disk_space_used() const noexcept;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const;
utils::small_vector<compaction_group_ptr, 3> compaction_groups();
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept;
utils::small_vector<compaction_group_ptr, 3> compaction_groups() noexcept;
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const noexcept;
utils::small_vector<compaction_group_ptr, 3> split_unready_groups() const;
bool split_unready_groups_are_empty() const;
@@ -430,7 +430,7 @@ public:
virtual storage_group& storage_group_for_token(dht::token) const = 0;
virtual utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const = 0;
virtual locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const = 0;
virtual locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept = 0;
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;

View File

@@ -2810,26 +2810,26 @@ future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& shar
});
}
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, db::snapshot_options opts) {
if (!opts.skip_flush) {
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, bool skip_flush) {
if (!skip_flush) {
co_await flush_table_on_all_shards(sharded_db, uuid);
}
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag, opts);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag);
}
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts) {
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), opts] (auto& table_name) {
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush) {
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), skip_flush] (auto& table_name) {
auto uuid = sharded_db.local().find_uuid(ks_name, table_name);
return snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
return snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
});
}
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts) {
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush) {
auto& ks = sharded_db.local().find_keyspace(ks_name);
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), opts] (const auto& pair) -> future<> {
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), skip_flush] (const auto& pair) -> future<> {
auto uuid = pair.second->id();
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
});
}
@@ -2951,12 +2951,7 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
auto truncated_at = truncated_at_opt.value_or(db_clock::now());
auto name = snapshot_name_opt.value_or(
format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name()));
// Use the sstable identifier in snapshot names to allow de-duplication of sstables
// at backup time even if they were migrated across shards or nodes and were renamed a given a new generation.
// We hard-code that here since we have no way to pass this option to auto-snapshot and
// it is always safe to use the sstable identifier for the sstable generation.
auto opts = db::snapshot_options{.use_sstable_identifier = true};
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name, opts);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name);
}
co_await sharded_db.invoke_on_all([&] (database& db) {

View File

@@ -1040,12 +1040,12 @@ public:
private:
using snapshot_file_set = foreign_ptr<std::unique_ptr<std::unordered_set<sstring>>>;
future<snapshot_file_set> take_snapshot(sstring jsondir, db::snapshot_options opts);
future<snapshot_file_set> take_snapshot(sstring jsondir);
// Writes the table schema and the manifest of all files in the snapshot directory.
future<> finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector<snapshot_file_set> file_sets);
static future<> seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets);
public:
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts);
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name);
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
static future<snapshot_details> get_snapshot_details(std::filesystem::path snapshot_dir, std::filesystem::path datadir);
@@ -1133,7 +1133,7 @@ public:
// The tablet filter is used to not double account migrating tablets, so it's important that
// only one of pending or leaving replica is accounted based on current migration stage.
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept;
const db::view::stats& get_view_stats() const {
return _view_stats;
@@ -2009,9 +2009,9 @@ public:
static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, db::snapshot_options opts);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts);
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, bool skip_flush);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush);
public:
bool update_column_family(schema_ptr s);

View File

@@ -234,12 +234,18 @@ distributed_loader::get_sstables_from_upload_dir(sharded<replica::database>& db,
}
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
distributed_loader::get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, type, prefix, sstables=std::move(sstables), &get_abort_src] (auto& global_table, auto& directory) {
distributed_loader::get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables), &get_abort_src, &db] (auto& global_table, auto& directory) {
return directory.start(global_table.as_sharded_parameter(),
sharded_parameter([bucket, endpoint, type, prefix, &get_abort_src] {
sharded_parameter([bucket, endpoint, prefix, &get_abort_src, &db] {
auto eps = db.local().get_config().object_storage_endpoints()
| std::views::filter([&endpoint](auto& ep) { return ep.key() == endpoint; })
;
if (eps.empty()) {
throw std::invalid_argument(fmt::format("Undefined endpoint {}", endpoint));
}
seastar::abort_source* as = get_abort_src ? get_abort_src() : nullptr;
auto opts = data_dictionary::make_object_storage_options(endpoint, type, bucket, prefix, as);
auto opts = data_dictionary::make_object_storage_options(endpoint, eps.front().type(), bucket, prefix, as);
return make_lw_shared<const data_dictionary::storage_options>(std::move(opts));
}),
sstables,

View File

@@ -92,7 +92,7 @@ public:
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_upload_dir(sharded<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
static future<> process_upload_dir(sharded<replica::database>& db, sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape);
};

View File

@@ -708,7 +708,7 @@ public:
return *_single_sg;
}
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const override {
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const noexcept override {
return locator::combined_load_stats{
.table_ls = locator::table_load_stats{
.size_in_bytes = _single_sg->live_disk_space_used(),
@@ -874,7 +874,7 @@ public:
return storage_group_for_id(storage_group_of(token).first);
}
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const override;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept override;
bool all_storage_groups_split() override;
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
future<> maybe_split_compaction_group_of(size_t idx) override;
@@ -922,7 +922,7 @@ compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_ran
return _main_cg;
}
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const {
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept {
action(_main_cg);
for (auto& cg : _merging_groups) {
action(cg);
@@ -932,7 +932,7 @@ void storage_group::for_each_compaction_group(std::function<void(const compactio
}
}
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() {
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() noexcept {
utils::small_vector<compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
@@ -940,7 +940,7 @@ utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups()
return cgs;
}
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const {
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const noexcept {
utils::small_vector<const_compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
@@ -1890,7 +1890,7 @@ sstables::file_size_stats compaction_group::live_disk_space_used_full_stats() co
return _main_sstables->get_file_size_stats() + _maintenance_sstables->get_file_size_stats();
}
uint64_t storage_group::live_disk_space_used() const {
uint64_t storage_group::live_disk_space_used() const noexcept {
auto cgs = const_cast<storage_group&>(*this).compaction_groups();
return std::ranges::fold_left(cgs | std::views::transform(std::mem_fn(&compaction_group::live_disk_space_used)), uint64_t(0), std::plus{});
}
@@ -2813,7 +2813,7 @@ void table::on_flush_timer() {
});
}
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
locator::table_load_stats table_stats;
table_stats.split_ready_seq_number = _split_ready_seq_number;
@@ -2836,7 +2836,7 @@ locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std:
};
}
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
return _sg_manager->table_load_stats(std::move(tablet_filter));
}
@@ -3268,7 +3268,7 @@ future<> table::write_schema_as_cql(const global_table_ptr& table_shards, sstrin
}
// Runs the orchestration code on an arbitrary shard to balance the load.
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts) {
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name) {
auto* so = std::get_if<storage_options::local>(&table_shards->get_storage_options().value);
if (so == nullptr) {
throw std::runtime_error("Snapshotting non-local tables is not implemented");
@@ -3291,7 +3291,7 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> {
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
return table_shards->take_snapshot(jsondir, opts);
return table_shards->take_snapshot(jsondir);
}));
});
co_await io_check(sync_directory, jsondir);
@@ -3300,22 +3300,19 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
});
}
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir, db::snapshot_options opts) {
tlogger.trace("take_snapshot {}: use_sstable_identifier={}", jsondir, opts.use_sstable_identifier);
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir) {
tlogger.trace("take_snapshot {}", jsondir);
auto sstable_deletion_guard = co_await get_sstable_list_permit();
auto tables = *_sstables->all() | std::ranges::to<std::vector<sstables::shared_sstable>>();
auto table_names = std::make_unique<std::unordered_set<sstring>>();
auto& ks_name = schema()->ks_name();
auto& cf_name = schema()->cf_name();
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&, opts] (sstables::shared_sstable sstable) -> future<> {
auto gen = co_await io_check([sstable, &dir = jsondir, opts] {
return sstable->snapshot(dir, opts.use_sstable_identifier);
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) {
table_names->insert(sstable->component_basename(sstables::component_type::Data));
return io_check([sstable, &dir = jsondir] {
return sstable->snapshot(dir);
});
auto fname = sstable->component_basename(ks_name, cf_name, sstable->get_version(), gen, sstable->get_format(), sstables::component_type::Data);
table_names->insert(fname);
});
co_return make_foreign(std::move(table_names));
}
@@ -3456,7 +3453,7 @@ size_t compaction_group::memtable_count() const noexcept {
return _memtables->size();
}
size_t storage_group::memtable_count() const {
size_t storage_group::memtable_count() const noexcept {
return std::ranges::fold_left(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{});
}

View File

@@ -38,9 +38,8 @@ for required in jq curl; do
fi
done
FORCE=0
ALLOW_SUBMODULE=0
ALLOW_UNSTABLE=0
ALLOW_ANY_BRANCH=0
function print_usage {
cat << EOF
@@ -61,18 +60,12 @@ Options:
-h
Print this help message and exit.
--allow-submodule
Allow a PR to update a submudule
--allow-unstable
--force
Do not check current branch to be next*
Do not check jenkins job status
--allow-any-branch
Merge PR even if target branch is not next
--force
Sets all above --allow-* options
--allow-submodule
Allow a PR to update a submudule
EOF
}
@@ -80,23 +73,13 @@ while [[ $# -gt 0 ]]
do
case $1 in
"--force"|"-f")
ALLOW_UNSTABLE=1
ALLOW_SUBMODULE=1
ALLOW_ANY_BRANCH=1
FORCE=1
shift 1
;;
--allow-submodule)
ALLOW_SUBMODULE=1
shift
;;
--allow-unstable)
ALLOW_UNSTABLE=1
shift
;;
--allow-any-branch)
ALLOW_ANY_BRANCH=1
shift
;;
+([0-9]))
PR_NUM=$1
shift 1
@@ -164,7 +147,7 @@ check_jenkins_job_status() {
fi
}
if [[ $ALLOW_UNSTABLE -eq 0 ]]; then
if [[ $FORCE -eq 0 ]]; then
check_jenkins_job_status
fi
@@ -196,19 +179,17 @@ echo -n "Fetching full name of author $PR_LOGIN... "
USER_NAME=$(curl -s "https://api.github.com/users/$PR_LOGIN" | jq -r .name)
echo "$USER_NAME"
if [[ $ALLOW_ANY_BRANCH -eq 0 ]]; then
BASE_BRANCH=$(jq -r .base.ref <<< $PR_DATA)
CURRENT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
TARGET_BASE="unknown"
if [[ ${BASE_BRANCH} == master ]]; then
TARGET_BASE="next"
elif [[ ${BASE_BRANCH} == branch-* ]]; then
TARGET_BASE=${BASE_BRANCH//branch/next}
fi
if [[ "${CURRENT_BRANCH}" != "${TARGET_BASE}" ]]; then
echo "Merging into wrong next, want ${TARGET_BASE}, have ${CURRENT_BRANCH}. Use --allow-any-branch or --force to skip this check"
exit 1
fi
BASE_BRANCH=$(jq -r .base.ref <<< $PR_DATA)
CURRENT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
TARGET_BASE="unknown"
if [[ ${BASE_BRANCH} == master ]]; then
TARGET_BASE="next"
elif [[ ${BASE_BRANCH} == branch-* ]]; then
TARGET_BASE=${BASE_BRANCH//branch/next}
fi
if [[ "${CURRENT_BRANCH}" != "${TARGET_BASE}" ]]; then
echo "Merging into wrong next, want ${TARGET_BASE}, have ${CURRENT_BRANCH}"
exit 1
fi
git fetch "$REMOTE" pull/$PR_NUM/head

View File

@@ -1138,8 +1138,7 @@ private:
topology_mutation_builder builder(guard.write_timestamp());
topology_request_tracking_mutation_builder trbuilder(global_request_id, _sp._features.topology_requests_type_column);
trbuilder.set_truncate_table_data(table_id)
.set("done", false)
.set("start_time", db_clock::now());
.set("done", false);
if (!_sp._features.topology_global_request_queue) {
builder.set_global_topology_request(global_topology_request::truncate_table)

View File

@@ -4940,7 +4940,6 @@ future<> storage_service::do_clusterwide_vnodes_cleanup() {
builder.queue_global_topology_request_id(request_id);
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::cleanup);
muts.push_back(rtbuilder.build());
} else {
@@ -5265,7 +5264,6 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
builder.queue_global_topology_request_id(request_id);
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::new_cdc_generation);
muts.push_back(rtbuilder.build());
} else {
@@ -6822,7 +6820,6 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
});
}
auto ts = db_clock::now();
for (const auto& token : tokens) {
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
@@ -6836,20 +6833,6 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
.set_repair_task_info(last_token, repair_task_info, _feature_service)
.build());
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::requested,
.first_token = dht::token::to_int64(tmap.get_first_token(tid)),
.last_token = dht::token::to_int64(tmap.get_last_token(tid)),
.timestamp = ts,
.table_uuid = table,
};
if (_feature_service.tablet_repair_tasks_table) {
auto cmuts = co_await _sys_ks.local().get_update_repair_task_mutations(entry, guard.write_timestamp());
for (auto& m : cmuts) {
updates.push_back(std::move(m));
}
}
}
sstring reason = format("Repair tablet by API request tokens={} tablet_task_id={}", tokens, repair_task_info.tablet_task_id);

View File

@@ -136,17 +136,6 @@ db::tablet_options combine_tablet_options(R&& opts) {
return combined_opts;
}
static std::unordered_set<locator::tablet_id> split_string_to_tablet_id(std::string_view s, char delimiter) {
auto tokens_view = s | std::views::split(delimiter)
| std::views::transform([](auto&& range) {
return std::string_view(&*range.begin(), std::ranges::distance(range));
})
| std::views::transform([](std::string_view sv) {
return locator::tablet_id(std::stoul(std::string(sv)));
});
return std::unordered_set<locator::tablet_id>{tokens_view.begin(), tokens_view.end()};
}
// Used to compare different migration choices in regard to impact on load imbalance.
// There is a total order on migration_badness such that better migrations are ordered before worse ones.
struct migration_badness {
@@ -904,8 +893,6 @@ public:
co_await coroutine::maybe_yield();
auto& config = tmap.repair_scheduler_config();
auto now = db_clock::now();
auto skip = utils::get_local_injector().inject_parameter<std::string_view>("tablet_repair_skip_sched");
auto skip_tablets = skip ? split_string_to_tablet_id(*skip, ',') : std::unordered_set<locator::tablet_id>();
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
auto gid = locator::global_tablet_id{table, id};
// Skip tablet that is in transitions.
@@ -926,11 +913,6 @@ public:
co_return;
}
if (skip_tablets.contains(id)) {
lblogger.debug("Skipped tablet repair for tablet={} by error injector", gid);
co_return;
}
// Avoid rescheduling a failed tablet repair in a loop
// TODO: Allow user to config
const auto min_reschedule_time = std::chrono::seconds(5);

View File

@@ -10,10 +10,10 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "service/storage_service.hh"
#include "repair/row_level.hh"
#include "service/task_manager_module.hh"
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/UUID_gen.hh"
#include <seastar/coroutine/maybe_yield.hh>
namespace service {
@@ -58,9 +58,14 @@ static std::optional<tasks::task_stats> maybe_make_task_stats(const locator::tab
.kind = tasks::task_kind::cluster,
.scope = get_scope(task_info.request_type),
.state = tasks::task_manager::task_state::running,
.sequence_number = 0,
.keyspace = schema->ks_name(),
.table = schema->cf_name(),
.start_time = task_info.request_time
.entity = "",
.shard = 0,
.creation_time = task_info.request_time,
.start_time = task_info.sched_time,
.end_time = db_clock::time_point{}
};
}
@@ -110,16 +115,6 @@ future<std::optional<tasks::virtual_task_hint>> tablet_virtual_task::contains(ta
tid = tmap.next_tablet(*tid);
}
}
// Check if the task id is present in the repair task table
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(task_id);
if (progress && progress->requested > 0) {
co_return tasks::virtual_task_hint{
.table_id = progress->table_uuid,
.task_type = locator::tablet_task_type::user_repair,
.tablet_id = std::nullopt,
};
}
co_return std::nullopt;
}
@@ -236,7 +231,8 @@ static void update_status(const locator::tablet_task_info& task_info, tasks::tas
sched_nr += task_info.sched_nr;
status.type = locator::tablet_task_type_to_string(task_info.request_type);
status.scope = get_scope(task_info.request_type);
status.start_time = task_info.request_time;
status.creation_time = task_info.request_time;
status.start_time = task_info.sched_time;
}
future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(tasks::task_id id, tasks::virtual_task_hint hint) {
@@ -254,20 +250,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
size_t sched_nr = 0;
auto tmptr = _ss.get_token_metadata_ptr();
auto& tmap = tmptr->tablets().get_tablet_map(table);
bool repair_task_finished = false;
bool repair_task_pending = false;
if (is_repair_task(task_type)) {
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(id);
if (progress) {
res.status.progress.completed = progress->finished;
res.status.progress.total = progress->requested;
res.status.progress_units = "tablets";
if (progress->requested > 0 && progress->requested == progress->finished) {
repair_task_finished = true;
} if (progress->requested > 0 && progress->requested > progress->finished) {
repair_task_pending = true;
}
}
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) {
auto& task_info = info.repair_task_info;
if (task_info.tablet_task_id.uuid() == id.uuid()) {
@@ -299,17 +282,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
res.status.state = sched_nr == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running;
co_return res;
}
if (repair_task_pending) {
// When repair_task_pending is true, the res.tablets will be empty iff the request is aborted by user.
res.status.state = res.tablets.empty() ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::running;
co_return res;
}
if (repair_task_finished) {
res.status.state = tasks::task_manager::task_state::done;
co_return res;
}
// FIXME: Show finished tasks.
co_return std::nullopt;
}

View File

@@ -956,6 +956,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
req_entry = co_await _sys_ks.get_topology_request_entry(req_id, true);
req = std::get<global_topology_request>(req_entry.request_type);
}
switch (req) {
case global_topology_request::new_cdc_generation: {
rtlogger.info("new CDC generation requested");
@@ -975,9 +976,14 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set_global_topology_request(req)
.set_global_topology_request_id(req_id)
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id);
// Set start_time when we begin executing the request
topology_request_tracking_mutation_builder rtbuilder(req_id);
rtbuilder.set("start_time", db_clock::now());
auto reason = ::format(
"insert CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build(), rtbuilder.build()}, reason);
}
break;
case global_topology_request::cleanup:
@@ -1068,7 +1074,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
.build()));
// Set start_time when we begin executing the request and mark as done
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
.set("start_time", db_clock::now())
.done(error)
.build()));
@@ -1088,7 +1096,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set_global_topology_request_id(req_id)
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
.set_session(session_id(req_id));
co_await update_topology_state(std::move(guard), {builder.build()}, "TRUNCATE TABLE requested");
// Set start_time when we begin executing the request
topology_request_tracking_mutation_builder rtbuilder(req_id);
rtbuilder.set("start_time", db_clock::now());
co_await update_topology_state(std::move(guard), {builder.build(), rtbuilder.build()}, "TRUNCATE TABLE requested");
}
break;
}
@@ -1205,8 +1218,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
// Record the repair_time returned by the repair_tablet rpc call
db_clock::time_point repair_time;
// Record the repair task update muations
utils::chunked_vector<canonical_mutation> repair_task_updates;
service::session_id session_id;
};
@@ -1739,14 +1750,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
dst = dst_opt.value().host;
}
// Update repair task
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(tinfo.repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::finished,
.first_token = dht::token::to_int64(tmap.get_first_token(gid.tablet)),
.last_token = dht::token::to_int64(tmap.get_last_token(gid.tablet)),
.table_uuid = gid.table,
};
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
service::session_id::create_random_id() : trinfo->session_id;
@@ -1755,10 +1758,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
auto& tablet_state = _tablets[tablet];
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
if (_feature_service.tablet_repair_tasks_table) {
entry.timestamp = db_clock::now();
tablet_state.repair_task_updates = co_await _sys_ks.get_update_repair_task_mutations(entry, api::new_timestamp());
}
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={}",
dst, tablet, duration, res.repair_time);
})) {
@@ -1777,9 +1776,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
.del_repair_task_info(last_token, _feature_service)
.del_session(last_token);
for (auto& m : tablet_state.repair_task_updates) {
updates.push_back(std::move(m));
}
// Skip update repair time in case hosts filter or dcs filter is set.
if (valid && is_filter_off) {
auto sched_time = tinfo.repair_task_info.sched_time;
@@ -3296,6 +3292,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
topology_mutation_builder builder(guard.write_timestamp());
builder.del_global_topology_request();
if (_feature_service.topology_global_request_queue) {
// Set start_time when we begin executing the request
topology_request_tracking_mutation_builder start_rtbuilder(*global_request_id);
start_rtbuilder.set("start_time", db_clock::now());
muts.emplace_back(start_rtbuilder.build());
topology_request_tracking_mutation_builder rtbuilder(*global_request_id);
builder.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, *global_request_id);

View File

@@ -2117,14 +2117,11 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
}
sstable_id sid;
// Force a random sstable_id for testing purposes
bool random_sstable_identifier = utils::get_local_injector().is_enabled("random_sstable_identifier");
if (!random_sstable_identifier && generation().is_uuid_based()) {
if (generation().is_uuid_based()) {
sid = sstable_id(generation().as_uuid());
} else {
sid = sstable_id(utils::UUID_gen::get_time_UUID());
auto msg = random_sstable_identifier ? "forced random sstable_id" : "has numerical generation";
sstlog.info("SSTable {} {}. SSTable identifier in scylla_metadata set to {}", get_filename(), msg, sid);
sstlog.info("SSTable {} has numerical generation. SSTable identifier in scylla_metadata set to {}", get_filename(), sid);
}
_components->scylla_metadata->data.set<scylla_metadata_type::SSTableIdentifier>(scylla_metadata::sstable_identifier{sid});
@@ -2488,6 +2485,11 @@ void sstable::validate_originating_host_id() const {
}
return;
}
if (*originating_host_id != local_host_id) {
// FIXME refrain from throwing an exception because of #10148
sstlog.warn("Host id {} does not match local host id {} while validating SSTable: {}. Load foreign SSTables via the upload dir instead.", *originating_host_id, local_host_id, get_filename());
}
}
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,
@@ -2538,11 +2540,8 @@ std::vector<std::pair<component_type, sstring>> sstable::all_components() const
return all;
}
future<generation_type> sstable::snapshot(const sstring& dir, bool use_sstable_identifier) const {
// Use the sstable identifier UUID if available to enable global de-duplication of sstables in backup.
generation_type gen = (use_sstable_identifier && _sstable_identifier) ? generation_type(_sstable_identifier->uuid()) : _generation;
co_await _storage->snapshot(*this, dir, storage::absolute_path::yes, gen);
co_return gen;
future<> sstable::snapshot(const sstring& dir) const {
return _storage->snapshot(*this, dir, storage::absolute_path::yes);
}
future<> sstable::change_state(sstable_state to, delayed_commit_changes* delay_commit) {

View File

@@ -397,10 +397,6 @@ public:
return _version;
}
format_types get_format() const {
return _format;
}
// Returns the total bytes of all components.
uint64_t bytes_on_disk() const;
file_size_stats get_file_size_stats() const;
@@ -442,10 +438,7 @@ public:
std::vector<std::pair<component_type, sstring>> all_components() const;
// When use_sstable_identifier is true and the sstable identifier is available,
// use it to name the sstable in the snapshot, rather than the sstable generation.
// Returns the generation used for snapshot.
future<generation_type> snapshot(const sstring& dir, bool use_sstable_identifier = false) const;
future<> snapshot(const sstring& dir) const;
// Delete the sstable by unlinking all sstable files
// Ignores all errors.

View File

@@ -135,17 +135,13 @@ future<> storage_manager::update_config(const db::config& cfg) {
co_return;
}
auto storage_manager::get_endpoint(const sstring& endpoint) -> object_storage_endpoint& {
shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client(sstring endpoint) {
auto found = _object_storage_endpoints.find(endpoint);
if (found == _object_storage_endpoints.end()) {
smlogger.error("unable to find {} in configured object-storage endpoints", endpoint);
throw std::invalid_argument(format("endpoint {} not found", endpoint));
}
return found->second;
}
shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client(sstring endpoint) {
auto& ep = get_endpoint(endpoint);
auto& ep = found->second;
if (ep.client == nullptr) {
ep.client = make_object_storage_client(ep.cfg, _object_storage_clients_memory, [&ct = container()] (std::string ep) {
return ct.local().get_endpoint_client(ep);
@@ -154,10 +150,6 @@ shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client
return ep.client;
}
sstring storage_manager::get_endpoint_type(sstring endpoint) {
return get_endpoint(endpoint).cfg.type();
}
bool storage_manager::is_known_endpoint(sstring endpoint) const {
return _object_storage_endpoints.contains(endpoint);
}

View File

@@ -70,7 +70,6 @@ class storage_manager : public peering_sharded_service<storage_manager> {
seastar::metrics::metric_groups metrics;
future<> update_config(const db::config&);
object_storage_endpoint& get_endpoint(const sstring& ep);
public:
struct config {
@@ -81,7 +80,6 @@ public:
storage_manager(const db::config&, config cfg);
shared_ptr<object_storage_client> get_endpoint_client(sstring endpoint);
bool is_known_endpoint(sstring endpoint) const;
sstring get_endpoint_type(sstring endpoint);
future<> stop();
std::vector<sstring> endpoints(sstring type = "") const noexcept;
};

View File

@@ -205,13 +205,6 @@ private:
}
bool tablet_in_scope(locator::tablet_id) const;
friend future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);
// Pay attention, while working with tablet ranges, the `erm` must be held alive as long as we retrieve (and use here) tablet ranges from
// the tablet map. This is already done when using `tablet_sstable_streamer` class but tread carefully if you plan to use this method somewhere else.
static future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);
};
host_id_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token) const {
@@ -350,52 +343,55 @@ public:
}
};
future<std::vector<tablet_sstable_collection>> tablet_sstable_streamer::get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges) {
auto tablets_sstables =
tablets_ranges | std::views::transform([](auto range) { return tablet_sstable_collection{.tablet_range = range}; }) | std::ranges::to<std::vector>();
if (sstables.empty() || tablets_sstables.empty()) {
co_return std::move(tablets_sstables);
}
// sstables are sorted by first key in reverse order.
auto reversed_sstables = sstables | std::views::reverse;
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : tablets_sstables) {
for (const auto& sst : reversed_sstables) {
auto sst_first = sst->get_first_decorated_key().token();
auto sst_last = sst->get_last_decorated_key().token();
// SSTable entirely after tablet -> no further SSTables (larger keys) can overlap
if (tablet_range.after(sst_first, dht::token_comparator{})) {
break;
}
// SSTable entirely before tablet -> skip and continue scanning later (larger keys)
if (tablet_range.before(sst_last, dht::token_comparator{})) {
continue;
}
if (tablet_range.contains(dht::token_range{sst_first, sst_last}, dht::token_comparator{})) {
sstables_fully_contained.push_back(sst);
} else {
sstables_partially_contained.push_back(sst);
}
co_await coroutine::maybe_yield();
}
}
co_return std::move(tablets_sstables);
}
future<> tablet_sstable_streamer::stream(shared_ptr<stream_progress> progress) {
if (progress) {
progress->start(_tablet_map.tablet_count());
}
auto classified_sstables = co_await get_sstables_for_tablets(
_sstables, _tablet_map.tablet_ids() | std::views::filter([this](auto tid) { return tablet_in_scope(tid); }) | std::views::transform([this](auto tid) {
return _tablet_map.get_token_range(tid);
}) | std::ranges::to<std::vector>());
// sstables are sorted by first key in reverse order.
auto sstable_it = _sstables.rbegin();
for (auto tablet_id : _tablet_map.tablet_ids() | std::views::filter([this] (auto tid) { return tablet_in_scope(tid); })) {
auto tablet_range = _tablet_map.get_token_range(tablet_id);
auto sstable_token_range = [] (const sstables::shared_sstable& sst) {
return dht::token_range(sst->get_first_decorated_key().token(),
sst->get_last_decorated_key().token());
};
std::vector<sstables::shared_sstable> sstables_fully_contained;
std::vector<sstables::shared_sstable> sstables_partially_contained;
// sstable is exhausted if its last key is before the current tablet range
auto exhausted = [&tablet_range] (const sstables::shared_sstable& sst) {
return tablet_range.before(sst->get_last_decorated_key().token(), dht::token_comparator{});
};
while (sstable_it != _sstables.rend() && exhausted(*sstable_it)) {
sstable_it++;
}
for (auto sst_it = sstable_it; sst_it != _sstables.rend(); sst_it++) {
auto sst_token_range = sstable_token_range(*sst_it);
// sstables are sorted by first key, so should skip this SSTable since it
// doesn't overlap with the current tablet range.
if (!tablet_range.overlaps(sst_token_range, dht::token_comparator{})) {
// If the start of the next SSTable's token range lies beyond the current tablet's token
// range, we can safely conclude that no more relevant SSTables remain for this tablet.
if (tablet_range.after(sst_token_range.start()->value(), dht::token_comparator{})) {
break;
}
continue;
}
if (tablet_range.contains(sst_token_range, dht::token_comparator{})) {
sstables_fully_contained.push_back(*sst_it);
} else {
sstables_partially_contained.push_back(*sst_it);
}
co_await coroutine::maybe_yield();
}
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : classified_sstables) {
auto per_tablet_progress = make_shared<per_tablet_stream_progress>(
progress,
sstables_fully_contained.size() + sstables_partially_contained.size());
@@ -755,9 +751,8 @@ future<> sstables_loader::download_task_impl::run() {
};
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix);
auto ep_type = _loader.local()._storage_manager.get_endpoint_type(_endpoint);
std::vector<seastar::abort_source> shard_aborts(smp::count);
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, ep_type, _bucket, _prefix, cfg, [&] {
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg, [&] {
return &shard_aborts[this_shard_id()];
});
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
@@ -837,7 +832,3 @@ future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, s
std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica));
co_return task->id();
}
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges) {
return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges));
}

View File

@@ -10,8 +10,6 @@
#include <vector>
#include <seastar/core/sharded.hh>
#include "dht/i_partitioner_fwd.hh"
#include "dht/token.hh"
#include "schema/schema_fwd.hh"
#include "sstables/shared_sstable.hh"
#include "tasks/task_manager.hh"
@@ -154,18 +152,3 @@ struct fmt::formatter<sstables_loader::stream_scope> : fmt::formatter<string_vie
}
}
};
struct tablet_sstable_collection {
dht::token_range tablet_range;
std::vector<sstables::shared_sstable> sstables_fully_contained;
std::vector<sstables::shared_sstable> sstables_partially_contained;
};
// This function is intended for test purposes only.
// It assigns the given sstables to the given tablet ranges based on token containment.
// It returns a vector of tablet_sstable_collection, each containing the tablet range
// and the sstables that are fully or partially contained within that range.
// The prerequisite is the tablet ranges are sorted by the range in ascending order and non-overlapping.
// Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order.
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);

View File

@@ -10,6 +10,7 @@
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/overloaded_functor.hh"
#include "utils/UUID_gen.hh"
#include <seastar/core/with_timeout.hh>
@@ -19,6 +20,11 @@ namespace tasks {
using task_status_variant = std::variant<tasks::task_manager::foreign_task_ptr, tasks::task_manager::task::task_essentials>;
static db_clock::time_point get_creation_time_from_task_id(task_id id) {
// Task IDs are timeuuids (version 1 UUIDs), so we can extract the timestamp from them
return db_clock::time_point(utils::UUID_gen::unix_timestamp(id.uuid()));
}
static future<task_status> get_task_status(task_manager::task_ptr task) {
auto host_id = task->get_module()->get_task_manager().get_host_id();
auto local_task_status = task->get_status();
@@ -29,6 +35,7 @@ static future<task_status> get_task_status(task_manager::task_ptr task) {
.scope = local_task_status.scope,
.state = local_task_status.state,
.is_abortable = task->is_abortable(),
.creation_time = get_creation_time_from_task_id(local_task_status.id),
.start_time = local_task_status.start_time,
.end_time = local_task_status.end_time,
.error = local_task_status.error,
@@ -173,6 +180,7 @@ future<utils::chunked_vector<task_status>> task_handler::get_status_recursively(
.scope = task.task_status.scope,
.state = task.task_status.state,
.is_abortable = task.abortable,
.creation_time = get_creation_time_from_task_id(task.task_status.id),
.start_time = task.task_status.start_time,
.end_time = task.task_status.end_time,
.error = task.task_status.error,

View File

@@ -26,6 +26,7 @@ struct task_status {
std::string scope;
task_manager::task_state state;
is_abortable is_abortable;
db_clock::time_point creation_time;
db_clock::time_point start_time;
db_clock::time_point end_time;
std::string error;
@@ -51,6 +52,7 @@ struct task_stats {
std::string table;
std::string entity;
unsigned shard;
db_clock::time_point creation_time;
db_clock::time_point start_time;
db_clock::time_point end_time;
};

View File

@@ -21,6 +21,7 @@
#include "utils/assert.hh"
#include "utils/chunked_vector.hh"
#include "utils/overloaded_functor.hh"
#include "utils/UUID_gen.hh"
#include "service/storage_service.hh"
#include "tasks/task_handler.hh"
#include "task_manager.hh"
@@ -559,6 +560,7 @@ future<utils::chunked_vector<task_stats>> task_manager::module::get_stats(is_int
.table = task->get_status().table,
.entity = task->get_status().entity,
.shard = task->get_status().shard,
.creation_time = db_clock::time_point(utils::UUID_gen::unix_timestamp(task->id().uuid())),
.start_time = task->get_status().start_time,
.end_time = task->get_status().end_time,
});

View File

@@ -370,7 +370,6 @@ add_scylla_test(combined_tests
sstable_compression_config_test.cc
sstable_directory_test.cc
sstable_set_test.cc
sstable_tablet_streaming.cc
statement_restrictions_test.cc
storage_proxy_test.cc
tablets_test.cc

View File

@@ -31,7 +31,6 @@
#include "replica/database.hh"
#include "utils/assert.hh"
#include "utils/lister.hh"
#include "utils/rjson.hh"
#include "partition_slice_builder.hh"
#include "mutation/frozen_mutation.hh"
#include "test/lib/mutation_source_test.hh"
@@ -39,7 +38,6 @@
#include "service/migration_manager.hh"
#include "sstables/sstables.hh"
#include "sstables/generation_type.hh"
#include "sstables/sstable_version.hh"
#include "db/config.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog.hh"
@@ -53,7 +51,6 @@
#include "db/system_keyspace.hh"
#include "db/view/view_builder.hh"
#include "replica/mutation_dump.hh"
#include "utils/error_injection.hh"
using namespace std::chrono_literals;
using namespace sstables;
@@ -615,13 +612,13 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
});
}
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) {
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", bool skip_flush = false) {
try {
auto uuid = e.db().local().find_uuid(ks_name, cf_name);
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts);
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, skip_flush);
} catch (...) {
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={} use_sstable_identifier={}: {}",
ks_name, cf_name, snapshot_name, opts.skip_flush, opts.use_sstable_identifier, std::current_exception());
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
ks_name, cf_name, snapshot_name, skip_flush, std::current_exception());
throw;
}
}
@@ -635,37 +632,6 @@ future<std::set<sstring>> collect_files(fs::path path) {
co_return ret;
}
static bool is_component(const sstring& fname, const sstring& suffix) {
return fname.ends_with(suffix);
}
static std::set<sstring> collect_sstables(const std::set<sstring>& all_files, const sstring& suffix) {
// Verify manifest against the files in the snapshots dir
auto pred = [&suffix] (const sstring& fname) {
return is_component(fname, suffix);
};
return std::ranges::filter_view(all_files, pred) | std::ranges::to<std::set<sstring>>();
}
// Validate that the manifest.json lists exactly the SSTables present in the snapshot directory
static future<> validate_manifest(const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir) {
sstring suffix = "-Data.db";
auto sstables_in_snapshot = collect_sstables(in_snapshot_dir, suffix);
std::set<sstring> sstables_in_manifest;
auto manifest_str = co_await util::read_entire_file_contiguous(snapshot_dir / "manifest.json");
auto manifest_json = rjson::parse(manifest_str);
auto& manifest_files = manifest_json["files"];
BOOST_REQUIRE(manifest_files.IsArray());
for (auto& f : manifest_files.GetArray()) {
if (is_component(f.GetString(), suffix)) {
sstables_in_manifest.insert(f.GetString());
}
}
testlog.debug("SSTables in manifest.json: {}", fmt::join(sstables_in_manifest, ", "));
BOOST_REQUIRE_EQUAL(sstables_in_snapshot, sstables_in_manifest);
}
static future<> snapshot_works(const std::string& table_name) {
return do_with_some_data({"cf"}, [table_name] (cql_test_env& e) {
take_snapshot(e, "ks", table_name).get();
@@ -685,8 +651,6 @@ static future<> snapshot_works(const std::string& table_name) {
// all files were copied and manifest was generated
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
validate_manifest(snapshot_dir, in_snapshot_dir).get();
return make_ready_future<>();
}, true);
}
@@ -705,8 +669,7 @@ SEASTAR_TEST_CASE(index_snapshot_works) {
SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
db::snapshot_options opts = {.skip_flush = true};
take_snapshot(e, "ks", "cf", "test", opts).get();
take_snapshot(e, "ks", "cf", "test", true /* skip_flush */).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
@@ -719,41 +682,6 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
});
}
SEASTAR_TEST_CASE(snapshot_use_sstable_identifier_works) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
return make_ready_future<>();
#endif
sstring table_name = "cf";
// Force random sstable identifiers, otherwise the initial sstable_id is equal
// to the sstable generation and the test can't distinguish between them.
utils::get_local_injector().enable("random_sstable_identifier", false);
return do_with_some_data({table_name}, [table_name] (cql_test_env& e) -> future<> {
sstring tag = "test";
db::snapshot_options opts = {.use_sstable_identifier = true};
co_await take_snapshot(e, "ks", table_name, tag, opts);
auto& cf = e.local_db().find_column_family("ks", table_name);
auto table_directory = table_dir(cf);
auto snapshot_dir = table_directory / sstables::snapshots_dir / tag;
auto in_table_dir = co_await collect_files(table_directory);
// snapshot triggered a flush and wrote the data down.
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
testlog.info("Files in table dir: {}", fmt::join(in_table_dir, ", "));
auto in_snapshot_dir = co_await collect_files(snapshot_dir);
testlog.info("Files in snapshot dir: {}", fmt::join(in_snapshot_dir, ", "));
in_table_dir.insert("manifest.json");
in_table_dir.insert("schema.cql");
// all files were copied and manifest was generated
BOOST_REQUIRE_EQUAL(in_table_dir.size(), in_snapshot_dir.size());
BOOST_REQUIRE_NE(in_table_dir, in_snapshot_dir);
co_await validate_manifest(snapshot_dir, in_snapshot_dir);
}, true);
}
SEASTAR_TEST_CASE(snapshot_list_okay) {
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
auto& cf = e.local_db().find_column_family("ks", "cf");
@@ -1528,7 +1456,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
}
BOOST_REQUIRE(found);
co_await take_snapshot(e, "ks", "cf", "test", db::snapshot_options{.skip_flush = true});
co_await take_snapshot(e, "ks", "cf", "test", true /* skip_flush */);
testlog.debug("Expected: {}", expected);

View File

@@ -1450,7 +1450,8 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
std::map<sstring, replication_strategy_config_option> options;
for (const auto& dc : option_dcs) {
auto num_racks = node_count_per_rack.at(dc).size();
auto rf = num_racks;
auto max_rf_factor = std::ranges::min(std::ranges::views::transform(node_count_per_rack.at(dc), [] (auto& x) { return x.second; }));
auto rf = num_racks * tests::random::get_int(1UL, max_rf_factor);
options.emplace(dc, fmt::to_string(rf));
}
return options;
@@ -1486,7 +1487,8 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
// Test tablets rack-aware base-view pairing
auto base_token = dht::token::get_random_token();
auto view_token = dht::token::get_random_token();
bool use_tablets = true;
bool use_legacy_self_pairing = false;
bool use_tablets_basic_rack_aware_view_pairing = true;
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
replica::cf_stats cf_stats;
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
@@ -1500,7 +1502,8 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
*ars_ptr,
base_token,
view_token,
use_tablets,
use_legacy_self_pairing,
use_tablets_basic_rack_aware_view_pairing,
cf_stats).natural_endpoint;
// view pair must be found
@@ -1522,6 +1525,181 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
}
}
// Called in a seastar thread
void test_complex_rack_aware_view_pairing_test(bool more_or_less) {
auto my_address = gms::inet_address("localhost");
// Create the RackInferringSnitch
snitch_config cfg;
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
cfg.name = "RackInferringSnitch";
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
std::map<sstring, size_t> node_count_per_dc;
std::map<sstring, std::map<sstring, size_t>> node_count_per_rack;
std::vector<ring_point> ring_points;
auto& random_engine = seastar::testing::local_random_engine;
unsigned shard_count = 2;
size_t num_dcs = 1 + tests::random::get_int(3);
// Generate a random cluster
double point = 1;
for (size_t dc = 0; dc < num_dcs; ++dc) {
sstring dc_name = fmt::format("{}", 100 + dc);
size_t num_racks = 2 + tests::random::get_int(4);
for (size_t rack = 0; rack < num_racks; ++rack) {
sstring rack_name = fmt::format("{}", 10 + rack);
size_t rack_nodes = 1 + tests::random::get_int(2);
for (size_t i = 1; i <= rack_nodes; ++i) {
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
node_count_per_dc[dc_name]++;
node_count_per_rack[dc_name][rack_name]++;
point++;
}
}
}
testlog.debug("node_count_per_rack={}", node_count_per_rack);
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
co_await tm.update_normal_tokens(std::move(tokens), id);
}
}).get();
auto base_schema = schema_builder("ks", "base")
.with_column("k", utf8_type, column_kind::partition_key)
.with_column("v", utf8_type)
.build();
auto view_schema = schema_builder("ks", "view")
.with_column("v", utf8_type, column_kind::partition_key)
.with_column("k", utf8_type)
.build();
auto tmptr = stm.get();
// Create the replication strategy
auto make_random_options = [&] () {
auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to<std::vector>();
std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine);
std::map<sstring, replication_strategy_config_option> options;
for (const auto& dc : option_dcs) {
auto num_racks = node_count_per_rack.at(dc).size();
auto rf = more_or_less ?
tests::random::get_int(num_racks, node_count_per_dc[dc]) :
tests::random::get_int(1UL, num_racks);
options.emplace(dc, fmt::to_string(rf));
}
return options;
};
auto options = make_random_options();
size_t tablet_count = 1 + tests::random::get_int(99);
testlog.debug("tablet_count={} rf_options={}", tablet_count, options);
locator::replication_strategy_params params(options, tablet_count, std::nullopt);
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", params, tmptr->get_topology());
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);
auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get();
auto base_table_id = base_schema->id();
testlog.debug("base_table_id={}", base_table_id);
auto view_table_id = view_schema->id();
auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get();
testlog.debug("view_table_id={}", view_table_id);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
tm.tablets().set_tablet_map(base_table_id, co_await base_tmap.clone_gently());
tm.tablets().set_tablet_map(view_table_id, co_await view_tmap.clone_gently());
}).get();
tmptr = stm.get();
auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr);
auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr);
auto& topology = tmptr->get_topology();
testlog.debug("topology: {}", topology.get_datacenter_racks());
// Test tablets rack-aware base-view pairing
auto base_token = dht::token::get_random_token();
auto view_token = dht::token::get_random_token();
bool use_legacy_self_pairing = false;
bool use_tablets_basic_rack_aware_view_pairing = true;
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
replica::cf_stats cf_stats;
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
std::unordered_map<locator::host_id, locator::host_id> view_to_base_pairing;
std::unordered_map<sstring, size_t> same_rack_pairs;
std::unordered_map<sstring, size_t> cross_rack_pairs;
for (const auto& base_replica : base_replicas) {
auto& base_host = base_replica.host;
auto view_ep_opt = db::view::get_view_natural_endpoint(
base_host,
base_erm,
view_erm,
*ars_ptr,
base_token,
view_token,
use_legacy_self_pairing,
use_tablets_basic_rack_aware_view_pairing,
cf_stats).natural_endpoint;
// view pair must be found
if (!view_ep_opt) {
BOOST_FAIL(format("Could not pair base_host={} base_token={} view_token={}", base_host, base_token, view_token));
}
BOOST_REQUIRE(view_ep_opt);
auto& view_ep = *view_ep_opt;
// Assert pairing uniqueness
auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep);
BOOST_REQUIRE(inserted_base_pair);
auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host);
BOOST_REQUIRE(inserted_view_pair);
auto& base_location = topology.find_node(base_host)->dc_rack();
auto& view_location = topology.find_node(view_ep)->dc_rack();
// Assert dc- and rack- aware pairing
BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc);
if (base_location.rack == view_location.rack) {
same_rack_pairs[base_location.dc]++;
} else {
cross_rack_pairs[base_location.dc]++;
}
}
for (const auto& [dc, rf_opt] : options) {
auto rf = locator::get_replication_factor(rf_opt);
BOOST_REQUIRE_EQUAL(same_rack_pairs[dc] + cross_rack_pairs[dc], rf);
}
}
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_lt_racks) {
test_complex_rack_aware_view_pairing_test(false);
}
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_gt_racks) {
test_complex_rack_aware_view_pairing_test(true);
}
SEASTAR_THREAD_TEST_CASE(test_rack_diff) {
BOOST_REQUIRE(diff_racks({}, {}).empty());

View File

@@ -346,60 +346,4 @@ SEASTAR_TEST_CASE(repair_rows_size_considers_external_memory) {
});
}
SEASTAR_TEST_CASE(test_tablet_token_range_count) {
{
// Simple case: one large range covers a smaller one
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r2 ranges overlap and should merge to cover r1
// r2: [0, 50] + [40, 100] -> merges to [0, 100]
// r1: [10, 90] should be covered
utils::chunked_vector<tablet_token_range> r1 = {{10, 90}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 50}, {40, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r2 ranges are adjacent (contiguous) and should merge
// r2: [0, 10] + [11, 20] -> merges to [0, 20]
// r1: [5, 15] should be covered
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}, {11, 20}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r1 overlaps r2 but is not FULLY contained
// r2: [0, 10]
// r1: [5, 15] (Ends too late), [ -5, 5 ] (Starts too early)
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}, {-5, 5}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 0);
}
{
// A single merged range in r2 covers multiple distinct ranges in r1
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}, {30, 40}, {50, 60}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 3);
}
{
// Inputs are provided in random order, ensuring the internal sort works
utils::chunked_vector<tablet_token_range> r1 = {{50, 60}, {10, 20}};
utils::chunked_vector<tablet_token_range> r2 = {{50, 100}, {0, 40}};
// r2 merges effectively to [0, 40] and [50, 100]
// Both r1 items are covered
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 2);
}
{
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
utils::chunked_vector<tablet_token_range> r2_empty = {};
utils::chunked_vector<tablet_token_range> r1_empty = {};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2_empty) == 0);
BOOST_REQUIRE(co_await count_finished_tablets(r1_empty, r2) == 0);
}
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -1,367 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "dht/token.hh"
#include "sstable_test.hh"
#include "sstables_loader.hh"
#include "test/lib/sstable_test_env.hh"
BOOST_AUTO_TEST_SUITE(sstable_tablet_streaming_test)
using namespace sstables;
std::vector<shared_sstable> make_sstables_with_ranges(test_env& env, const std::vector<std::pair<int64_t, int64_t>>& ranges) {
std::vector<shared_sstable> ssts;
for (const auto& [first, last] : ranges) {
auto sst = env.make_sstable(uncompressed_schema(), uncompressed_dir());
test(sst).set_first_and_last_keys(dht::decorated_key(dht::token{first}, partition_key(std::vector<bytes>{"1"})),
dht::decorated_key(dht::token{last}, partition_key(std::vector<bytes>{"1"})));
ssts.push_back(std::move(sst));
}
// By sorting SSTables by their primary key, we enable runs to be
// streamed incrementally. Overlapping fragments can be deduplicated,
// reducing the amount of data sent over the wire. Elements are
// popped from the back of the vector, so we sort in descending
// order to begin with the smaller tokens.
// See sstable_streamer constructor for more details.
std::ranges::sort(ssts, [](const shared_sstable& x, const shared_sstable& y) { return x->compare_by_first_key(*y) > 0; });
return ssts;
}
std::vector<dht::token_range> get_tablet_sstable_collection(auto&&... tablet_ranges) {
// tablet ranges are left-non-inclusive, see `tablet_map::get_token_range` for details
std::vector<dht::token_range> collections{dht::token_range::make({tablet_ranges.start()->value(), false}, {tablet_ranges.end()->value(), true})...};
std::sort(collections.begin(), collections.end(), [](auto const& a, auto const& b) { return a.start()->value() < b.start()->value(); });
return collections;
}
#define REQUIRE_WITH_CONTEXT(sstables, expected_size) \
BOOST_TEST_CONTEXT("Testing with ranges: " << [&] { \
std::stringstream ss; \
for (const auto& sst : (sstables)) { \
ss << dht::token_range(sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token()) << ", "; \
} \
return ss.str(); \
}()) \
BOOST_REQUIRE_EQUAL(sstables.size(), expected_size)
SEASTAR_TEST_CASE(test_streaming_ranges_distribution) {
return test_env::do_with_async([](test_env& env) {
// 1) Exact boundary equality: SSTable == tablet
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{5, 10},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 2) Single-point overlaps at start/end
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{4, 5}, // touches start, non-inclusive, skip
{10, 11}, // touches end
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 3) Tablet fully inside a large SSTable
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 20},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 4) Multiple SSTables fully contained in tablet
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{6, 7},
{7, 8},
{8, 9},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 3);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
// 5) Two overlapping but not fully contained SSTables
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 6}, // overlaps at left
{9, 15}, // overlaps at right
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
// 6) Unsorted input (helper sorts) + mixed overlaps
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{50}, dht::token{100}});
// Intentionally unsorted by first token
auto ssts = make_sstables_with_ranges(env,
{
{120, 130},
{0, 10},
{60, 70}, // fully contained
{40, 55}, // partial
{95, 105}, // partial
{80, 90}, // fully contained
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 2);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
// 7) Empty SSTable list
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
std::vector<shared_sstable> ssts;
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
// 8) Tablet outside all SSTables
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{100}, dht::token{200}});
auto ssts = make_sstables_with_ranges(env,
{
{1, 2},
{3, 4},
{10, 20},
{300, 400},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
// 9) Boundary adjacency with multiple fragments
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{100}, dht::token{200}});
auto ssts = make_sstables_with_ranges(env,
{
{50, 100}, // touches start -> non-inclusive, skip
{100, 120}, // starts at start -> partially contained
{180, 200}, // ends at end -> fully contained
{200, 220}, // touches end -> partial
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 1);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
// 10) Large SSTable set where early break should occur
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{1000}, dht::token{2000}});
auto ssts = make_sstables_with_ranges(env,
{
{100, 200},
{300, 400},
{900, 950},
{1001, 1100}, // fully contained
{1500, 1600}, // fully contained
{2101, 2200}, // entirely after -> should trigger early break in ascending scan
{1999, 2100}, // overlap, partially contained
{3000, 3100},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 2);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 10) https://github.com/scylladb/scylladb/pull/26980 example, tested
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{4}, dht::token{5}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 5},
{0, 3},
{2, 5},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
// None fully contained; three partial overlaps
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
});
}
SEASTAR_TEST_CASE(test_streaming_ranges_distribution_in_tablets) {
return test_env::do_with_async([](test_env& env) {
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}}, dht::token_range{dht::token{11}, dht::token{15}});
auto ssts = make_sstables_with_ranges(env,
{
{5, 10},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 0);
}
{
// Multiple tablets with a hole between [10,11]
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{4}},
dht::token_range{dht::token{5}, dht::token{9}},
dht::token_range{dht::token{12}, dht::token{15}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 4}, // T.start==S.start, but non-inclusive -> partial
{5, 9}, // same as above
{6, 8}, // fully in second tablet
{10, 11}, // falls in the hole, should be rejected
{8, 13}, // overlaps second and third tablets (partial in both)
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 2);
REQUIRE_WITH_CONTEXT(res[2].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[2].sstables_partially_contained, 1);
}
{
// SSTables outside any tablet range
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{20}, dht::token{25}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 5}, // before
{30, 35}, // after
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
{
// Edge case: SSTable touching tablet boundary
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{4, 5}, // touches start, non-inclusive, skip
{10, 11}, // touches end
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
{
// No tablets, but some SSTables
auto collection = get_tablet_sstable_collection();
auto ssts = make_sstables_with_ranges(env,
{
{0, 5},
{10, 15},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
BOOST_REQUIRE_EQUAL(res.size(), 0); // no tablets → nothing to classify
}
{
// No SSTables, but some tablets
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{5}}, dht::token_range{dht::token{10}, dht::token{15}});
std::vector<shared_sstable> ssts; // empty
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 0);
}
{
// No tablets and no SSTables
auto collection = get_tablet_sstable_collection();
std::vector<shared_sstable> ssts; // empty
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
BOOST_REQUIRE_EQUAL(res.size(), 0);
}
{
// SSTable spanning two tablets
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{4}}, dht::token_range{dht::token{5}, dht::token{9}});
auto ssts = make_sstables_with_ranges(env,
{
{2, 7}, // spans both tablets
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
// Tablet [0,4] sees partial overlap
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
// Tablet [5,9] sees partial overlap
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
}
{
// SSTable spanning three tablets with a hole in between
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{3}},
dht::token_range{dht::token{4}, dht::token{6}},
dht::token_range{dht::token{8}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{2, 9}, // spans across tablets 1,2,3 and hole [7]
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[2].sstables_partially_contained, 1);
}
{
// SSTable fully covering one tablet and partially overlapping another
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{5}}, dht::token_range{dht::token{6}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 7}, // fully covers first tablet, partial in second
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
}
});
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -395,7 +395,7 @@ SEASTAR_TEST_CASE(test_builder_with_concurrent_drop) {
assert_that(msg).is_rows().is_empty();
msg = e.execute_cql("select * from system_distributed.view_build_status").get();
assert_that(msg).is_rows().is_empty();
}, 30);
});
});
}

View File

@@ -16,26 +16,16 @@ from test.cluster.util import get_topology_coordinator, new_test_keyspace, recon
logger = logging.getLogger(__name__)
# This test makes sure that view building is done mainly in the streaming
# scheduling group. We check that by grepping all relevant logs in TRACE mode
# and verifying that they come from the streaming scheduling group.
#
# For more context, see: https://github.com/scylladb/scylladb/issues/21232.
# This test reproduces the issue in non-tablet mode.
# This test makes sure that view building is done mainly in the streaming scheduling group
# and not the gossip scheduling group. We do that by measuring the time each group was
# busy during the view building process and confirming that the gossip group was busy
# much less than the streaming group.
# Reproduces https://github.com/scylladb/scylladb/issues/21232
@pytest.mark.asyncio
@skip_mode('debug', 'the test needs to do some work which takes too much time in debug mode')
async def test_view_building_scheduling_group(manager: ManagerClient):
# Note: The view building coordinator works in the gossiping scheduling group,
# and we intentionally omit it here.
# Note: We include "view" for keyspaces that don't use the view building coordinator
# and will follow the legacy path instead.
loggers = ["view_building_worker", "view_consumer", "view_update_generator", "view"]
# Flatten the list of lists.
cmdline = sum([["--logger-log-level", f"{logger}=trace"] for logger in loggers], [])
server = await manager.server_add(cmdline=cmdline)
server = await manager.server_add()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (p int, c int, PRIMARY KEY (p, c))")
@@ -45,30 +35,21 @@ async def test_view_building_scheduling_group(manager: ManagerClient):
batch = "BEGIN UNLOGGED BATCH\n" + "\n".join(inserts) + "\nAPPLY BATCH\n"
await manager.cql.run_async(batch)
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
metrics_before = await manager.metrics.query(server.ip_addr)
ms_gossip_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
ms_streaming_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv AS SELECT p, c FROM {ks}.tab WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p)")
await wait_for_view(cql, 'mv', 1)
logger_alternative = "|".join(loggers)
pattern = rf"\[shard [0-9]+:(.+)\] ({logger_alternative}) - "
results = await log.grep(pattern, from_mark=mark)
# Sanity check. If there are no logs, something's wrong.
assert len(results) > 0
# In case of non-tablet keyspaces, we won't use the view building coordinator.
# Instead, view updates will follow the legacy path. Along the way, we'll observe
# this message, which will be printed using another scheduling group, so let's
# filter it out.
predicate = lambda result: f"Building view {ks}.mv, starting at token" not in result[0]
results = list(filter(predicate, results))
# Take the first parenthesized match for each result, i.e. the scheduling group.
sched_groups = [matches[1] for _, matches in results]
assert all(sched_group == "strm" for sched_group in sched_groups)
metrics_after = await manager.metrics.query(server.ip_addr)
ms_gossip_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
ms_streaming_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
ms_streaming = ms_streaming_after - ms_streaming_before
ms_statement = ms_gossip_after - ms_gossip_before
ratio = ms_statement / ms_streaming
print(f"ms_streaming: {ms_streaming}, ms_statement: {ms_statement}, ratio: {ratio}")
assert ratio < 0.1
# A sanity check test ensures that starting and shutting down Scylla when view building is
# disabled is conducted properly and we don't run into any issues.

View File

@@ -38,6 +38,7 @@ class TaskStats(NamedTuple):
entity: str
sequence_number: SequenceNum
shard: int
creation_time: str
start_time: str
end_time: str
@@ -54,6 +55,7 @@ class TaskStatus(NamedTuple):
entity: str
sequence_number: SequenceNum
is_abortable: bool
creation_time: str
start_time: str
end_time: str
error: str

View File

@@ -25,14 +25,12 @@ import json
from cassandra.auth import PlainTextAuthProvider
import threading
import random
import re
from test.cluster.util import get_replication
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.conftest import skip_mode
from test.pylib.tablets import get_tablet_replica
logger = logging.getLogger(__name__)
@@ -971,118 +969,3 @@ async def test_alternator_concurrent_rmw_same_partition_different_server(manager
t.join()
finally:
table.delete()
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
"""
Reproducer for issue #27353.
LWT requires that storage_proxy::cas() is invoked on a valid shard — the one
returned by sharder.try_get_shard_for_reads() for a tablets-based table.
The bug: if the current shard is invalid and we jump to the valid shard, that
new shard may become invalid again by the time we attempt to capture the ERM.
This leads to a failure of the CAS path.
The fix: retry the validity check and jump again if the current shard is already
invalid. We should exit the loop once the shard is valid *and* we hold a strong pointer
to the ERM — which prevents further tablet movements until the ERM is released.
This problem is specific to BatchWriteItem; other commands are already handled
correctly.
"""
config = alternator_config.copy()
config['alternator_write_isolation'] = 'always_use_lwt'
cmdline = [
'--logger-log-level', 'alternator-executor=trace',
'--logger-log-level', 'alternator_controller=trace',
'--logger-log-level', 'paxos=trace'
]
server = await manager.server_add(config=config, cmdline=cmdline)
alternator = get_alternator(server.ip_addr)
logger.info("Creating alternator test table")
table = alternator.create_table(TableName=unique_table_name(),
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],
BillingMode='PAY_PER_REQUEST',
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'N'}])
table_name = table.name
ks_name = 'alternator_' + table_name
last_token = 7 # Any token works since we have only one tablet
(src_host_id, src_shard) = await get_tablet_replica(manager, server, ks_name, table_name, last_token)
dst_shard = 0 if src_shard == 1 else 1
logger.info("Inject 'intranode_migration_streaming_wait'")
await manager.api.enable_injection(server.ip_addr,
"intranode_migration_streaming_wait",
one_shot=False)
logger.info("Start tablet migration")
intranode_migration_task = asyncio.create_task(
manager.api.move_tablet(server.ip_addr, ks_name, table_name,
src_host_id, src_shard,
src_host_id, dst_shard, last_token))
logger.info("Open server logs")
log = await manager.server_open_log(server.server_id)
logger.info("Wait for intranode_migration_streaming_wait")
await log.wait_for("intranode_migration_streaming: waiting")
logger.info("Inject 'alternator_executor_batch_write_wait'")
await manager.api.enable_injection(server.ip_addr,
"alternator_executor_batch_write_wait",
one_shot=False,
parameters={
'table': table_name,
'keyspace': ks_name,
'shard': dst_shard
})
m = await log.mark()
# Start a background thread, which tries to hit the alternator_executor_batch_write_wait
# injection on the destination shard.
logger.info("Start a batch_write thread")
stop_event = threading.Event()
def run_batch():
alternator = get_alternator(server.ip_addr)
table = alternator.Table(table_name)
while not stop_event.is_set():
with table.batch_writer() as batch:
batch.put_item(Item={'p': 1, 'x': 'hellow world'})
t = ThreadWrapper(target=run_batch)
t.start()
logger.info("Waiting for 'alternator_executor_batch_write_wait: hit'")
await log.wait_for("alternator_executor_batch_write_wait: hit", from_mark=m)
# We have a batch request with "streaming" cas_shard on the destination shard.
# This means we have already made a decision to jump to the src_shard.
# Now we're releasing the tablet migration so that it reaches write_both_read_new and
# and invaldiates this decision.
m = await log.mark()
await manager.api.message_injection(server.ip_addr, "intranode_migration_streaming_wait")
# The next barrier must be for the write_both_read_new, we need a guarantee
# that the src_shard observed it
logger.info("Waiting for the next barrier")
await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"),
from_mark=m)
# Now we have a guarantee that a new barrier succeeded on the src_shard,
# this means the src_shard has already transitioned to write_both_read_new,
# and our batch write will have to jump back to the destination shard.
logger.info("Release the 'alternator_executor_batch_write_wait'")
await manager.api.message_injection(server.ip_addr, "alternator_executor_batch_write_wait")
logger.info("Waiting for migratino task to finish")
await intranode_migration_task
stop_event.set()
t.join()

View File

@@ -10,7 +10,6 @@ import logging
from test.pylib.rest_client import inject_error_one_shot
from test.cluster.util import new_test_keyspace
from test.pylib.util import gather_safely
logger = logging.getLogger(__name__)
@@ -34,12 +33,25 @@ async def test_broken_bootstrap(manager: ManagerClient):
except Exception:
pass
await gather_safely(*(manager.server_stop(srv.server_id) for srv in [server_a, server_b]))
await manager.server_stop(server_b.server_id)
await manager.server_stop(server_a.server_id)
stop_event = asyncio.Event()
async def worker():
logger.info("Worker started")
while not stop_event.is_set():
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i
await asyncio.sleep(0.1)
logger.info("Worker stopped")
await manager.server_start(server_a.server_id)
await manager.driver_connect()
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i
worker_task = asyncio.create_task(worker())
await asyncio.sleep(20)
stop_event.set()
await worker_task

View File

@@ -43,86 +43,6 @@ async def guarantee_repair_time_next_second():
# different than the previous one.
await asyncio.sleep(1)
async def do_test_tablet_repair_progress_split_merge(manager: ManagerClient, do_split=False, do_merge=False):
nr_tablets = 16
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=True, tablets=nr_tablets)
token = 'all'
logs = []
for s in servers:
logs.append(await manager.server_open_log(s.server_id))
# Skip repair for the listed tablet id
nr_tablets_skipped = 4
nr_tablets_repaired = nr_tablets - nr_tablets_skipped
await inject_error_on(manager, "tablet_repair_skip_sched", servers, params={'value':"0,1,5,8"})
# Request to repair all tablets
repair_res = await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
logging.info(f'{repair_res=}')
tablet_task_id = repair_res['tablet_task_id']
async def get_task_status(desc):
task_status = await manager.api.get_task_status(servers[0].ip_addr, tablet_task_id)
completed = int(task_status['progress_completed'])
total = int(task_status['progress_total'])
logging.info(f'{desc=} {completed=} {total=} {task_status=}')
return completed, total
async def wait_task_progress(wanted_complete, wanted_total):
while True:
completed, total = await get_task_status("wait_task_progress")
if completed == wanted_complete and total == wanted_total:
break
await asyncio.sleep(1)
async def get_task_status_and_check(desc):
completed, total = await get_task_status(desc)
assert completed == nr_tablets_repaired
assert total == nr_tablets
# 12 out of 16 tablets should finish
await wait_task_progress(nr_tablets_repaired, nr_tablets)
if do_split:
await get_task_status_and_check("before_split")
s1_mark = await logs[0].mark()
await inject_error_on(manager, "tablet_force_tablet_count_increase", servers)
await logs[0].wait_for('Detected tablet split for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_increase", servers)
await get_task_status_and_check("after_split")
if do_merge:
await get_task_status_and_check("before_merge")
s1_mark = await logs[0].mark()
await inject_error_on(manager, "tablet_force_tablet_count_decrease", servers)
await logs[0].wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
await get_task_status_and_check("after_merge")
# Wait for all repair to finish after all tablets can be scheduled to run repair
await inject_error_off(manager, "tablet_repair_skip_sched", servers)
await wait_task_progress(nr_tablets, nr_tablets)
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.asyncio
async def test_tablet_repair_progress(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_split=False, do_merge=False)
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.asyncio
async def test_tablet_repair_progress_split(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_split=True)
@pytest.mark.asyncio
@pytest.mark.skip(reason="https://github.com/scylladb/scylladb/issues/26844")
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_repair_progress_merge(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_merge=True)
@pytest.mark.asyncio
async def test_tablet_manual_repair(manager: ManagerClient):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)

View File

@@ -4,8 +4,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from typing import Any
from cassandra.query import ConsistencyLevel, SimpleStatement
from cassandra.policies import FallthroughRetryPolicy
from cassandra.query import ConsistencyLevel
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
from test.pylib.manager_client import ManagerClient
@@ -1597,7 +1596,7 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
async def truncate_table():
await asyncio.sleep(10)
logger.info("Executing truncate during bootstrap")
await cql.run_async(SimpleStatement(f"TRUNCATE {ks}.test USING TIMEOUT 4m", retry_policy=FallthroughRetryPolicy()))
await cql.run_async(f"TRUNCATE {ks}.test USING TIMEOUT 1m")
truncate_task = asyncio.create_task(truncate_table())
logger.info("Adding fourth node")

View File

@@ -115,7 +115,7 @@ def compact_keyspace(cql, ks, flush_memtables=True):
args.extend([ks, cf])
run_nodetool(cql, "compact", *args)
def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
def take_snapshot(cql, table, tag, skip_flush):
ks, cf = table.split('.')
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params={'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush})
@@ -123,8 +123,6 @@ def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
args = ['--tag', tag, '--table', cf]
if skip_flush:
args.append('--skip-flush')
if use_sstable_identifier:
args.append('--use-sstable-identifier')
args.append(ks)
run_nodetool(cql, "snapshot", *args)

View File

@@ -163,11 +163,6 @@ public:
_sst->_shards.push_back(this_shard_id());
}
void set_first_and_last_keys(const dht::decorated_key& first_key, const dht::decorated_key& last_key) {
_sst->_first = first_key;
_sst->_last = last_key;
}
void rewrite_toc_without_component(component_type component) {
SCYLLA_ASSERT(component != component_type::TOC);
_sst->_recognized_components.erase(component);

View File

@@ -99,7 +99,7 @@ def test_listsnapshots_no_snapshots(nodetool, request):
assert res.stdout == "Snapshot Details: \nThere are no snapshots\n"
def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=False):
def check_snapshot_out(res, tag, ktlist, skip_flush):
"""Check that the output of nodetool snapshot contains the expected messages"""
if len(ktlist) == 0:
@@ -110,7 +110,7 @@ def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=Fals
pattern = re.compile("Requested creating snapshot\\(s\\)"
f" for \\[{keyspaces}\\]"
f" with snapshot name \\[(.+)\\]"
f" and options \\{{skip_flush={str(skip_flush).lower()}, use_sstable_identifier={str(use_sstable_identifier).lower()}\\}}")
f" and options \\{{skipFlush={str(skip_flush).lower()}\\}}")
print(res)
print(pattern)
@@ -138,13 +138,13 @@ def test_snapshot_keyspace(nodetool):
res = nodetool("snapshot", "--tag", tag, "ks1", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1"})
params={"tag": tag, "sf": "false", "kn": "ks1"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
res = nodetool("snapshot", "--tag", tag, "ks1", "ks2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
])
check_snapshot_out(res.stdout, tag, ["ks1", "ks2"], False)
@@ -155,13 +155,13 @@ def test_snapshot_keyspace_with_table(nodetool, option_name):
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl"})
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl1,tbl2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
@@ -186,7 +186,7 @@ class kn_param(NamedTuple):
def test_snapshot_keyspace_table_single_arg(nodetool, param, scylla_only):
tag = "my_snapshot"
req_params = {"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": param.kn}
req_params = {"tag": tag, "sf": "false", "kn": param.kn}
if param.cf:
req_params["cf"] = param.cf
@@ -202,19 +202,19 @@ def test_snapshot_ktlist(nodetool, option_name):
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1"})
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1"})
])
check_snapshot_out(res.stdout, tag, ["ks1.tbl1"], False)
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1,ks2.tbl2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1.tbl1,ks2.tbl2"})
params={"tag": tag, "sf": "false", "kn": "ks1.tbl1,ks2.tbl2"})
])
check_snapshot_out(res.stdout, tag, ["ks1.tbl1", "ks2.tbl2"], False)
res = nodetool("snapshot", "--tag", tag, option_name, "ks1,ks2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
])
check_snapshot_out(res.stdout, tag, ["ks1" ,"ks2"], False)
@@ -229,8 +229,7 @@ def test_snapshot_ktlist(nodetool, option_name):
{"ks": ["ks1", "ks2"], "tbl": []},
])
@pytest.mark.parametrize("skip_flush", [False, True])
@pytest.mark.parametrize("use_sstable_identifier", [False, True])
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_identifier):
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
cmd = ["snapshot"]
params = {}
@@ -243,11 +242,8 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_
if skip_flush:
cmd.append("--skip-flush")
if use_sstable_identifier:
cmd.append("--use-sstable-identifier")
params["sf"] = str(skip_flush).lower()
params["use_sstable_identifier"] = str(use_sstable_identifier).lower()
if ktlist:
if "tbl" in ktlist:
@@ -277,7 +273,7 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_
expected_request("POST", "/storage_service/snapshots", params=params)
])
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush, use_sstable_identifier)
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush)
def test_snapshot_multiple_keyspace_with_table(nodetool):

View File

@@ -109,7 +109,6 @@ class ResourceGather(ABC):
except subprocess.TimeoutExpired:
logger.critical(f"Process {args} timed out")
p.kill()
p.communicate()
except KeyboardInterrupt:
p.kill()
raise

View File

@@ -789,7 +789,7 @@ class ScyllaServer:
while time.time() < self.start_time + self.TOPOLOGY_TIMEOUT and not self.stop_event.is_set():
assert self.cmd is not None
if self.cmd.returncode is not None:
if self.cmd.returncode:
self.cmd = None
if expected_error is not None:
with self.log_filename.open("r", encoding="utf-8") as log_file:

View File

@@ -2362,23 +2362,16 @@ void snapshot_operation(scylla_rest_client& client, const bpo::variables_map& vm
params["sf"] = "false";
}
if (vm.contains("use-sstable-identifier")) {
params["use_sstable_identifier"] = "true";
} else {
params["use_sstable_identifier"] = "false";
}
client.post("/storage_service/snapshots", params);
if (kn_msg.empty()) {
kn_msg = params["kn"];
}
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skip_flush={}, use_sstable_identifier={}}}\n",
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skipFlush={}}}\n",
kn_msg,
params["tag"],
params["sf"],
params["use_sstable_identifier"]);
params["sf"]);
fmt::print(std::cout, "Snapshot directory: {}\n", params["tag"]);
}
@@ -3181,7 +3174,7 @@ void tasks_print_status(const rjson::value& res) {
auto status = res.GetObject();
for (const auto& x: status) {
if (x.value.IsString()) {
if (strcmp(x.name.GetString(), "start_time") == 0 || strcmp(x.name.GetString(), "end_time") == 0) {
if (strcmp(x.name.GetString(), "creation_time") == 0 || strcmp(x.name.GetString(), "start_time") == 0 || strcmp(x.name.GetString(), "end_time") == 0) {
fmt::print("{}: {}\n", x.name.GetString(), get_time(x.value.GetString()));
} else {
fmt::print("{}: {}\n", x.name.GetString(), x.value.GetString());
@@ -3233,6 +3226,7 @@ void tasks_add_tree_to_statuses_lists(Tabulate& table, const rjson::value& res)
rjson::to_string_view(status["scope"]),
rjson::to_string_view(status["state"]),
status["is_abortable"].GetBool(),
get_time(rjson::to_string_view(status["creation_time"])),
get_time(rjson::to_string_view(status["start_time"])),
get_time(rjson::to_string_view(status["end_time"])),
rjson::to_string_view(status["error"]),
@@ -3252,7 +3246,7 @@ void tasks_add_tree_to_statuses_lists(Tabulate& table, const rjson::value& res)
void tasks_print_trees(const std::vector<rjson::value>& res) {
Tabulate table;
table.add("id", "type", "kind", "scope", "state",
"is_abortable", "start_time", "end_time", "error", "parent_id",
"is_abortable", "creation_time", "start_time", "end_time", "error", "parent_id",
"sequence_number", "shard", "keyspace", "table", "entity",
"progress_units", "total", "completed", "children_ids");
@@ -3266,7 +3260,7 @@ void tasks_print_trees(const std::vector<rjson::value>& res) {
void tasks_print_stats_list(const rjson::value& res) {
auto stats = res.GetArray();
Tabulate table;
table.add("task_id", "type", "kind", "scope", "state", "sequence_number", "keyspace", "table", "entity", "shard", "start_time", "end_time");
table.add("task_id", "type", "kind", "scope", "state", "sequence_number", "keyspace", "table", "entity", "shard", "creation_time", "start_time", "end_time");
for (auto& element : stats) {
const auto& s = element.GetObject();
@@ -3280,6 +3274,7 @@ void tasks_print_stats_list(const rjson::value& res) {
rjson::to_string_view(s["table"]),
rjson::to_string_view(s["entity"]),
s["shard"].GetUint(),
get_time(rjson::to_string_view(s["creation_time"])),
get_time(rjson::to_string_view(s["start_time"])),
get_time(rjson::to_string_view(s["end_time"])));
}
@@ -4605,7 +4600,6 @@ For more information, see: {}
typed_option<sstring>("keyspace-table-list", "The keyspace.table pair(s) to snapshot, multiple ones can be joined with ','"),
typed_option<sstring>("tag,t", "The name of the snapshot"),
typed_option<>("skip-flush", "Do not flush memtables before snapshotting (snapshot will not contain unflushed data)"),
typed_option<>("use-sstable-identifier", "Use the sstable identifier UUID, if available, rather than the sstable generation for the sstable file names within the snapshot dir and the manifest file"),
},
{
typed_option<std::vector<sstring>>("keyspaces", "The keyspaces to snapshot", -1),

View File

@@ -1,4 +1,4 @@
FROM registry.fedoraproject.org/fedora:43
FROM docker.io/fedora:42
ARG CLANG_BUILD="SKIP"
ARG CLANG_ARCHIVES

View File

@@ -1 +1 @@
docker.io/scylladb/scylla-toolchain:fedora-43-20251208
docker.io/scylladb/scylla-toolchain:fedora-42-20251122

View File

@@ -65,7 +65,7 @@ SCYLLA_BUILD_DIR_FULLPATH="${SCYLLA_DIR}"/"${SCYLLA_BUILD_DIR}"
SCYLLA_NINJA_FILE_FULLPATH="${SCYLLA_DIR}"/"${SCYLLA_NINJA_FILE}"
# Which LLVM release to build in order to compile Scylla
LLVM_CLANG_TAG=21.1.6
LLVM_CLANG_TAG=20.1.8
CLANG_ARCHIVE=$(cd "${SCYLLA_DIR}" && realpath -m "${CLANG_ARCHIVE}")
@@ -186,3 +186,7 @@ if [[ $? -ne 0 ]]; then
fi
set -e
tar -C / -xpzf "${CLANG_ARCHIVE}"
dnf remove -y clang clang-libs
# above package removal might have removed those symbolic links, which will cause ccache not to work later on. Manually restore them.
ln -sf /usr/bin/ccache /usr/lib64/ccache/clang
ln -sf /usr/bin/ccache /usr/lib64/ccache/clang++