Compare commits
7 Commits
copilot/im
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea26e4b3a5 | ||
|
|
3b793ef09f | ||
|
|
df0a59ba03 | ||
|
|
69024a09b2 | ||
|
|
bb8f28a1ab | ||
|
|
32bc7e3a1c | ||
|
|
fb4e37248d |
@@ -729,14 +729,6 @@
|
||||
"allowMultiple":false,
|
||||
"type":"boolean",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"use_sstable_identifier",
|
||||
"description":"Use the sstable identifier UUID, if available, rather than the sstable generation.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"boolean",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
@@ -349,9 +349,13 @@
|
||||
"type":"long",
|
||||
"description":"The shard the task is running on"
|
||||
},
|
||||
"creation_time":{
|
||||
"type":"datetime",
|
||||
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
|
||||
},
|
||||
"start_time":{
|
||||
"type":"datetime",
|
||||
"description":"The start time of the task; unspecified (equal to epoch) when state == created"
|
||||
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
|
||||
},
|
||||
"end_time":{
|
||||
"type":"datetime",
|
||||
@@ -398,13 +402,17 @@
|
||||
"type":"boolean",
|
||||
"description":"Boolean flag indicating whether the task can be aborted"
|
||||
},
|
||||
"creation_time":{
|
||||
"type":"datetime",
|
||||
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
|
||||
},
|
||||
"start_time":{
|
||||
"type":"datetime",
|
||||
"description":"The start time of the task"
|
||||
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
|
||||
},
|
||||
"end_time":{
|
||||
"type":"datetime",
|
||||
"description":"The end time of the task (unspecified when the task is not completed)"
|
||||
"description":"The end time of the task (when execution completed); unspecified (equal to epoch) when the task is not completed"
|
||||
},
|
||||
"error":{
|
||||
"type":"string",
|
||||
|
||||
@@ -2020,16 +2020,12 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
auto tag = req->get_query_param("tag");
|
||||
auto column_families = split(req->get_query_param("cf"), ",");
|
||||
auto sfopt = req->get_query_param("sf");
|
||||
auto usiopt = req->get_query_param("use_sstable_identifier");
|
||||
db::snapshot_options opts = {
|
||||
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
|
||||
.use_sstable_identifier = strcasecmp(usiopt.c_str(), "true") == 0
|
||||
};
|
||||
auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
|
||||
|
||||
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
|
||||
try {
|
||||
if (column_families.empty()) {
|
||||
co_await snap_ctl.local().take_snapshot(tag, keynames, opts);
|
||||
co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
@@ -2037,7 +2033,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts);
|
||||
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
|
||||
}
|
||||
co_return json_void();
|
||||
} catch (...) {
|
||||
@@ -2072,8 +2068,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
auto info = parse_scrub_options(ctx, std::move(req));
|
||||
|
||||
if (!info.snapshot_tag.empty()) {
|
||||
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
|
||||
}
|
||||
|
||||
compaction::compaction_stats stats;
|
||||
|
||||
@@ -55,6 +55,7 @@ tm::task_status make_status(tasks::task_status status, sharded<gms::gossiper>& g
|
||||
res.scope = status.scope;
|
||||
res.state = status.state;
|
||||
res.is_abortable = bool(status.is_abortable);
|
||||
res.creation_time = get_time(status.creation_time);
|
||||
res.start_time = get_time(status.start_time);
|
||||
res.end_time = get_time(status.end_time);
|
||||
res.error = status.error;
|
||||
@@ -83,6 +84,7 @@ tm::task_stats make_stats(tasks::task_stats stats) {
|
||||
res.table = stats.table;
|
||||
res.entity = stats.entity;
|
||||
res.shard = stats.shard;
|
||||
res.creation_time = get_time(stats.creation_time);
|
||||
res.start_time = get_time(stats.start_time);
|
||||
res.end_time = get_time(stats.end_time);;
|
||||
return res;
|
||||
|
||||
@@ -146,8 +146,7 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
|
||||
auto info = parse_scrub_options(ctx, std::move(req));
|
||||
|
||||
if (!info.snapshot_tag.empty()) {
|
||||
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
|
||||
}
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
|
||||
@@ -165,8 +165,7 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
|
||||
service::topology_mutation_builder builder(ts);
|
||||
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now());
|
||||
rtbuilder.set("done", false);
|
||||
if (!qp.proxy().features().topology_global_request_queue) {
|
||||
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
|
||||
builder.set_global_topology_request_id(global_request_id);
|
||||
|
||||
@@ -65,7 +65,7 @@ future<> snapshot_ctl::run_snapshot_modify_operation(noncopyable_function<future
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
|
||||
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
|
||||
if (tag.empty()) {
|
||||
throw std::runtime_error("You must supply a snapshot name.");
|
||||
}
|
||||
@@ -74,21 +74,21 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
|
||||
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(keyspace_names));
|
||||
};
|
||||
|
||||
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), opts, this] () mutable {
|
||||
return do_take_snapshot(std::move(tag), std::move(keyspace_names), opts);
|
||||
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] () mutable {
|
||||
return do_take_snapshot(std::move(tag), std::move(keyspace_names), sf);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
|
||||
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
|
||||
co_await coroutine::parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) {
|
||||
return check_snapshot_not_exist(ks_name, tag);
|
||||
});
|
||||
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), opts] (const auto& ks_name) {
|
||||
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, opts);
|
||||
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
|
||||
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, bool(sf));
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
|
||||
if (ks_name.empty()) {
|
||||
throw std::runtime_error("You must supply a keyspace name");
|
||||
}
|
||||
@@ -99,14 +99,14 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
|
||||
throw std::runtime_error("You must supply a snapshot name.");
|
||||
}
|
||||
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
|
||||
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), opts);
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] () mutable {
|
||||
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), sf);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
|
||||
co_await check_snapshot_not_exist(ks_name, tag, tables);
|
||||
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
|
||||
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), bool(sf));
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {
|
||||
|
||||
@@ -38,13 +38,10 @@ class backup_task_impl;
|
||||
|
||||
} // snapshot namespace
|
||||
|
||||
struct snapshot_options {
|
||||
bool skip_flush = false;
|
||||
bool use_sstable_identifier = false;
|
||||
};
|
||||
|
||||
class snapshot_ctl : public peering_sharded_service<snapshot_ctl> {
|
||||
public:
|
||||
using skip_flush = bool_class<class skip_flush_tag>;
|
||||
|
||||
struct table_snapshot_details {
|
||||
int64_t total;
|
||||
int64_t live;
|
||||
@@ -73,8 +70,8 @@ public:
|
||||
*
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
*/
|
||||
future<> take_snapshot(sstring tag, snapshot_options opts = {}) {
|
||||
return take_snapshot(tag, {}, opts);
|
||||
future<> take_snapshot(sstring tag, skip_flush sf = skip_flush::no) {
|
||||
return take_snapshot(tag, {}, sf);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -83,7 +80,7 @@ public:
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
* @param keyspace_names the names of the keyspaces to snapshot; empty means "all"
|
||||
*/
|
||||
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {});
|
||||
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
|
||||
|
||||
/**
|
||||
* Takes the snapshot of multiple tables. A snapshot name must be specified.
|
||||
@@ -92,7 +89,7 @@ public:
|
||||
* @param tables a vector of tables names to snapshot
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
*/
|
||||
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
|
||||
|
||||
/**
|
||||
* Remove the snapshot with the given name from the given keyspaces.
|
||||
@@ -130,8 +127,8 @@ private:
|
||||
|
||||
friend class snapshot::backup_task_impl;
|
||||
|
||||
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
|
||||
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
|
||||
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -137,8 +137,6 @@ namespace {
|
||||
system_keyspace::ROLE_PERMISSIONS,
|
||||
system_keyspace::DICTS,
|
||||
system_keyspace::VIEW_BUILDING_TASKS,
|
||||
// repair tasks
|
||||
system_keyspace::REPAIR_TASKS,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.is_group0_table = true;
|
||||
@@ -464,24 +462,6 @@ schema_ptr system_keyspace::repair_history() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::repair_tasks() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, REPAIR_TASKS);
|
||||
return schema_builder(NAME, REPAIR_TASKS, std::optional(id))
|
||||
.with_column("task_uuid", uuid_type, column_kind::partition_key)
|
||||
.with_column("operation", utf8_type, column_kind::clustering_key)
|
||||
// First and last token for of the tablet
|
||||
.with_column("first_token", long_type, column_kind::clustering_key)
|
||||
.with_column("last_token", long_type, column_kind::clustering_key)
|
||||
.with_column("timestamp", timestamp_type)
|
||||
.with_column("table_uuid", uuid_type, column_kind::static_column)
|
||||
.set_comment("Record tablet repair tasks")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::built_indexes() {
|
||||
static thread_local auto built_indexes = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
|
||||
@@ -2331,7 +2311,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
corrupt_data(),
|
||||
scylla_local(), db::schema_tables::scylla_table_schema_history(),
|
||||
repair_history(),
|
||||
repair_tasks(),
|
||||
v3::views_builds_in_progress(), v3::built_views(),
|
||||
v3::scylla_views_builds_in_progress(),
|
||||
v3::truncated(),
|
||||
@@ -2573,32 +2552,6 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<canonical_mutation>> system_keyspace::get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts) {
|
||||
// Default to timeout the repair task entries in 10 days, this should be enough time for the management tools to query
|
||||
constexpr int ttl = 10 * 24 * 3600;
|
||||
sstring req = format("INSERT INTO system.{} (task_uuid, operation, first_token, last_token, timestamp, table_uuid) VALUES (?, ?, ?, ?, ?, ?) USING TTL {}", REPAIR_TASKS, ttl);
|
||||
auto muts = co_await _qp.get_mutations_internal(req, internal_system_query_state(), ts,
|
||||
{entry.task_uuid.uuid(), repair_task_operation_to_string(entry.operation),
|
||||
entry.first_token, entry.last_token, entry.timestamp, entry.table_uuid.uuid()});
|
||||
utils::chunked_vector<canonical_mutation> cmuts = {muts.begin(), muts.end()};
|
||||
co_return cmuts;
|
||||
}
|
||||
|
||||
future<> system_keyspace::get_repair_task(tasks::task_id task_uuid, repair_task_consumer f) {
|
||||
sstring req = format("SELECT * from system.{} WHERE task_uuid = {}", REPAIR_TASKS, task_uuid);
|
||||
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
||||
repair_task_entry ent;
|
||||
ent.task_uuid = tasks::task_id(row.get_as<utils::UUID>("task_uuid"));
|
||||
ent.operation = repair_task_operation_from_string(row.get_as<sstring>("operation"));
|
||||
ent.first_token = row.get_as<int64_t>("first_token");
|
||||
ent.last_token = row.get_as<int64_t>("last_token");
|
||||
ent.timestamp = row.get_as<db_clock::time_point>("timestamp");
|
||||
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
|
||||
co_await f(std::move(ent));
|
||||
co_return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<gms::generation_type> system_keyspace::increment_and_get_generation() {
|
||||
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
|
||||
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
|
||||
@@ -3770,35 +3723,4 @@ future<> system_keyspace::apply_mutation(mutation m) {
|
||||
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
|
||||
}
|
||||
|
||||
// The names are persisted in system tables so should not be changed.
|
||||
static const std::unordered_map<system_keyspace::repair_task_operation, sstring> repair_task_operation_to_name = {
|
||||
{system_keyspace::repair_task_operation::requested, "requested"},
|
||||
{system_keyspace::repair_task_operation::finished, "finished"},
|
||||
};
|
||||
|
||||
static const std::unordered_map<sstring, system_keyspace::repair_task_operation> repair_task_operation_from_name = std::invoke([] {
|
||||
std::unordered_map<sstring, system_keyspace::repair_task_operation> result;
|
||||
for (auto&& [v, s] : repair_task_operation_to_name) {
|
||||
result.emplace(s, v);
|
||||
}
|
||||
return result;
|
||||
});
|
||||
|
||||
sstring system_keyspace::repair_task_operation_to_string(system_keyspace::repair_task_operation op) {
|
||||
auto i = repair_task_operation_to_name.find(op);
|
||||
if (i == repair_task_operation_to_name.end()) {
|
||||
on_internal_error(slogger, format("Invalid repair task operation: {}", static_cast<int>(op)));
|
||||
}
|
||||
return i->second;
|
||||
}
|
||||
|
||||
system_keyspace::repair_task_operation system_keyspace::repair_task_operation_from_string(const sstring& name) {
|
||||
return repair_task_operation_from_name.at(name);
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
|
||||
auto fmt::formatter<db::system_keyspace::repair_task_operation>::format(const db::system_keyspace::repair_task_operation& op, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "{}", db::system_keyspace::repair_task_operation_to_string(op));
|
||||
}
|
||||
|
||||
@@ -57,8 +57,6 @@ namespace paxos {
|
||||
struct topology_request_state;
|
||||
|
||||
class group0_guard;
|
||||
|
||||
class raft_group0_client;
|
||||
}
|
||||
|
||||
namespace netw {
|
||||
@@ -186,7 +184,6 @@ public:
|
||||
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
|
||||
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
|
||||
static constexpr auto REPAIR_HISTORY = "repair_history";
|
||||
static constexpr auto REPAIR_TASKS = "repair_tasks";
|
||||
static constexpr auto GROUP0_HISTORY = "group0_history";
|
||||
static constexpr auto DISCOVERY = "discovery";
|
||||
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
|
||||
@@ -263,7 +260,6 @@ public:
|
||||
static schema_ptr raft();
|
||||
static schema_ptr raft_snapshots();
|
||||
static schema_ptr repair_history();
|
||||
static schema_ptr repair_tasks();
|
||||
static schema_ptr group0_history();
|
||||
static schema_ptr discovery();
|
||||
static schema_ptr broadcast_kv_store();
|
||||
@@ -402,22 +398,6 @@ public:
|
||||
int64_t range_end;
|
||||
};
|
||||
|
||||
enum class repair_task_operation {
|
||||
requested,
|
||||
finished,
|
||||
};
|
||||
static sstring repair_task_operation_to_string(repair_task_operation op);
|
||||
static repair_task_operation repair_task_operation_from_string(const sstring& name);
|
||||
|
||||
struct repair_task_entry {
|
||||
tasks::task_id task_uuid;
|
||||
repair_task_operation operation;
|
||||
int64_t first_token;
|
||||
int64_t last_token;
|
||||
db_clock::time_point timestamp;
|
||||
table_id table_uuid;
|
||||
};
|
||||
|
||||
struct topology_requests_entry {
|
||||
utils::UUID id;
|
||||
utils::UUID initiating_host;
|
||||
@@ -439,10 +419,6 @@ public:
|
||||
using repair_history_consumer = noncopyable_function<future<>(const repair_history_entry&)>;
|
||||
future<> get_repair_history(table_id, repair_history_consumer f);
|
||||
|
||||
future<utils::chunked_vector<canonical_mutation>> get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts);
|
||||
using repair_task_consumer = noncopyable_function<future<>(const repair_task_entry&)>;
|
||||
future<> get_repair_task(tasks::task_id task_uuid, repair_task_consumer f);
|
||||
|
||||
future<> save_truncation_record(const replica::column_family&, db_clock::time_point truncated_at, db::replay_position);
|
||||
future<replay_positions> get_truncated_positions(table_id);
|
||||
future<> drop_truncation_rp_records();
|
||||
@@ -750,8 +726,3 @@ public:
|
||||
}; // class system_keyspace
|
||||
|
||||
} // namespace db
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<db::system_keyspace::repair_task_operation> : fmt::formatter<string_view> {
|
||||
auto format(const db::system_keyspace::repair_task_operation&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
2
dist/common/sysconfig/scylla-node-exporter
vendored
2
dist/common/sysconfig/scylla-node-exporter
vendored
@@ -1 +1 @@
|
||||
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --collector.ethtool.metrics-include='(bw_in_allowance_exceeded|bw_out_allowance_exceeded|conntrack_allowance_exceeded|conntrack_allowance_available|linklocal_allowance_exceeded)' --collector.ethtool --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
|
||||
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
|
||||
|
||||
@@ -45,6 +45,22 @@ immediately after it's finished.
|
||||
|
||||
A flag which determines if a task can be aborted through API.
|
||||
|
||||
# Task timing fields
|
||||
|
||||
Tasks have three timing fields that track different stages of their lifecycle:
|
||||
|
||||
- `creation_time` - When the task was created/queued. This is extracted from the task's
|
||||
UUID (which is a timeuuid) and represents the moment the task request was submitted.
|
||||
- `start_time` - When the task actually began executing. For tasks that are queued, this
|
||||
will be unspecified (equal to epoch) until execution starts. For node operations
|
||||
like decommission, this is set when the request is picked up for execution by the
|
||||
topology coordinator.
|
||||
- `end_time` - When the task completed (successfully or with an error). This is
|
||||
unspecified (equal to epoch) until the task finishes.
|
||||
|
||||
The difference between `creation_time` and `start_time` represents the time a task
|
||||
spent waiting in the queue before execution began.
|
||||
|
||||
# Type vs scope vs kind
|
||||
|
||||
`type` of a task describes what operation is covered by a task,
|
||||
|
||||
@@ -17,7 +17,7 @@ SYNOPSIS
|
||||
[(-u <username> | --username <username>)] snapshot
|
||||
[(-cf <table> | --column-family <table> | --table <table>)]
|
||||
[(-kc <kclist> | --kc.list <kclist>)]
|
||||
[(-sf | --skip-flush)] [--use-sstable-identifier] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
|
||||
[(-sf | --skip-flush)] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
|
||||
|
||||
OPTIONS
|
||||
.......
|
||||
@@ -37,8 +37,6 @@ Parameter Descriptio
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-sf / --skip-flush Do not flush memtables before snapshotting (snapshot will not contain unflushed data)
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
--use-sstable-identifier Use the sstable identifier UUID, if available, rather than the sstable generation.
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-t <tag> / --tag <tag> The name of the snapshot
|
||||
==================================================================== =====================================================================================
|
||||
|
||||
|
||||
@@ -42,21 +42,21 @@ For single list:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
task_id type kind scope state sequence_number keyspace table entity shard start_time end_time
|
||||
5116ddb6-85b5-4c3e-94fb-72128f15d7b4 repair node keyspace done 3 abc 0 2025-01-16T16:12:11Z 2025-01-16T16:12:13Z
|
||||
task_id type kind scope state sequence_number keyspace table entity shard creation_time start_time end_time
|
||||
5116ddb6-85b5-4c3e-94fb-72128f15d7b4 repair node keyspace done 3 abc 0 2025-01-16T16:12:08Z 2025-01-16T16:12:11Z 2025-01-16T16:12:13Z
|
||||
|
||||
With repetition:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
task_id type kind scope state sequence_number keyspace table entity shard start_time end_time
|
||||
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:57Z
|
||||
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
|
||||
task_id type kind scope state sequence_number keyspace table entity shard creation_time start_time end_time
|
||||
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:54Z 2025-01-16T16:12:57Z
|
||||
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:42Z 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
|
||||
|
||||
task_id type kind scope state sequence_number keyspace table entity shard start_time end_time
|
||||
1e535f9b-97fa-4788-a956-8f3216a6ea8d repair node keyspace created 6 abc 0
|
||||
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:57Z
|
||||
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
|
||||
task_id type kind scope state sequence_number keyspace table entity shard creation_time start_time end_time
|
||||
1e535f9b-97fa-4788-a956-8f3216a6ea8d repair node keyspace created 6 abc 0 2025-01-16T16:13:02Z
|
||||
d8926ee7-0faf-47b7-bfeb-82477e0c7b33 repair node keyspace running 5 abc 0 2025-01-16T16:12:54Z 2025-01-16T16:12:57Z
|
||||
1e028cb8-31a3-45ed-8728-af7a1ab586f6 repair node keyspace done 4 abc 0 2025-01-16T16:12:42Z 2025-01-16T16:12:45Z 2025-01-16T16:12:47Z
|
||||
|
||||
See also
|
||||
--------
|
||||
|
||||
@@ -25,6 +25,7 @@ Example output
|
||||
scope: keyspace
|
||||
state: running
|
||||
is_abortable: true
|
||||
creation_time: 2024-07-29T15:48:50Z
|
||||
start_time: 2024-07-29T15:48:55Z
|
||||
end_time:
|
||||
error:
|
||||
|
||||
@@ -26,22 +26,22 @@ For single task:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
id type kind scope state is_abortable start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
|
||||
be5559ea-bc5a-428c-b8ce-d14eac7a1765 repair node keyspace done true 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z none 1 0 abc ranges 4 4 [{task_id: 542e38cb-9ad4-40aa-9010-de2630004e55, node: 127.0.0.1 }, {task_id: 8974ebcc-1e87-4040-88fe-f2438261f7fb, node: 127.0.0.1 }]
|
||||
542e38cb-9ad4-40aa-9010-de2630004e55 repair node shard done false 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 0 abc ranges 2 2 []
|
||||
8974ebcc-1e87-4040-88fe-f2438261f7fb repair node shard done false 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 1 abc ranges 2 2 []
|
||||
id type kind scope state is_abortable creation_time start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
|
||||
be5559ea-bc5a-428c-b8ce-d14eac7a1765 repair node keyspace done true 2024-07-29T16:06:43Z 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z none 1 0 abc ranges 4 4 [{task_id: 542e38cb-9ad4-40aa-9010-de2630004e55, node: 127.0.0.1 }, {task_id: 8974ebcc-1e87-4040-88fe-f2438261f7fb, node: 127.0.0.1 }]
|
||||
542e38cb-9ad4-40aa-9010-de2630004e55 repair node shard done false 2024-07-29T16:06:43Z 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 0 abc ranges 2 2 []
|
||||
8974ebcc-1e87-4040-88fe-f2438261f7fb repair node shard done false 2024-07-29T16:06:43Z 2024-07-29T16:06:46Z 2024-07-29T16:06:46Z be5559ea-bc5a-428c-b8ce-d14eac7a1765 1 1 abc ranges 2 2 []
|
||||
|
||||
For all tasks:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
id type kind scope state is_abortable start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
|
||||
16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc repair node keyspace done true 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z none 1 0 abc ranges 4 4 [{task_id: e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee, node: 127.0.0.1 }, {task_id: 49eb5797-b67e-46b0-9365-4460f7cf988a, node: 127.0.0.1 }]
|
||||
e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee repair node shard done false 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 0 abc ranges 2 2 []
|
||||
49eb5797-b67e-46b0-9365-4460f7cf988a repair node shard done false 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 1 abc ranges 2 2 []
|
||||
82d7b2a4-146e-4a72-ba93-c66d5b4e9867 offstrategy compaction node keyspace done true 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z none 954 0 abc 1 1 [{task_id: 9818277b-238d-4298-a56b-c0d2153bf140, node: 127.0.0.1 }, {task_id: c1eb0701-ad7a-45ff-956f-7b8d671fc5db, node: 127.0.0.1 }
|
||||
9818277b-238d-4298-a56b-c0d2153bf140 offstrategy compaction node shard done false 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 0 abc 1 1 []
|
||||
c1eb0701-ad7a-45ff-956f-7b8d671fc5db offstrategy compaction node shard done false 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 1 abc 1 1 []
|
||||
id type kind scope state is_abortable creation_time start_time end_time error parent_id sequence_number shard keyspace table entity progress_units total completed children_ids
|
||||
16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc repair node keyspace done true 2024-07-29T16:34:43Z 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z none 1 0 abc ranges 4 4 [{task_id: e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee, node: 127.0.0.1 }, {task_id: 49eb5797-b67e-46b0-9365-4460f7cf988a, node: 127.0.0.1 }]
|
||||
e0aa1aa4-58ca-4bfb-b3e6-74e5f3a0f6ee repair node shard done false 2024-07-29T16:34:43Z 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 0 abc ranges 2 2 []
|
||||
49eb5797-b67e-46b0-9365-4460f7cf988a repair node shard done false 2024-07-29T16:34:43Z 2024-07-29T16:34:46Z 2024-07-29T16:34:46Z 16eafb1e-8b2e-48e6-bd7a-432ca3d8b9fc 1 1 abc ranges 2 2 []
|
||||
82d7b2a4-146e-4a72-ba93-c66d5b4e9867 offstrategy compaction node keyspace done true 2024-07-29T16:34:13Z 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z none 954 0 abc 1 1 [{task_id: 9818277b-238d-4298-a56b-c0d2153bf140, node: 127.0.0.1 }, {task_id: c1eb0701-ad7a-45ff-956f-7b8d671fc5db, node: 127.0.0.1 }
|
||||
9818277b-238d-4298-a56b-c0d2153bf140 offstrategy compaction node shard done false 2024-07-29T16:34:13Z 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 0 abc 1 1 []
|
||||
c1eb0701-ad7a-45ff-956f-7b8d671fc5db offstrategy compaction node shard done false 2024-07-29T16:34:13Z 2024-07-29T16:34:16Z 2024-07-29T16:34:16Z 82d7b2a4-146e-4a72-ba93-c66d5b4e9867 954 1 abc 1 1 []
|
||||
|
||||
See also
|
||||
--------
|
||||
|
||||
8
docs/poetry.lock
generated
8
docs/poetry.lock
generated
@@ -1018,14 +1018,14 @@ sphinx-markdown-tables = "0.0.17"
|
||||
|
||||
[[package]]
|
||||
name = "sphinx-scylladb-theme"
|
||||
version = "1.8.10"
|
||||
version = "1.8.9"
|
||||
description = "A Sphinx Theme for ScyllaDB documentation projects"
|
||||
optional = false
|
||||
python-versions = "<4.0,>=3.10"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "sphinx_scylladb_theme-1.8.10-py3-none-any.whl", hash = "sha256:8b930f33bec7308ccaa92698ebb5ad85059bcbf93a463f92917aeaf473fce632"},
|
||||
{file = "sphinx_scylladb_theme-1.8.10.tar.gz", hash = "sha256:8a78a9b692d9a946be2c4a64aa472fd82204cc8ea0b1ee7f60de6db35b356326"},
|
||||
{file = "sphinx_scylladb_theme-1.8.9-py3-none-any.whl", hash = "sha256:f8649a7753a29494fd2b417d1cb855035dddb9ebd498ea033fd73f5f9338271e"},
|
||||
{file = "sphinx_scylladb_theme-1.8.9.tar.gz", hash = "sha256:ab7cda4c10a0d067c5c3a45f7b1f68cb8ebefe135a0be0738bfa282a344769b6"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -1603,4 +1603,4 @@ files = [
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.10"
|
||||
content-hash = "0ae673106f45d3465cbdabbf511e165ca44feadd34d7753f2e68093afaa95c79"
|
||||
content-hash = "74912627a3f424290ed7889451c0bdb1a862ab85b1d07c85f4f3b8c34f32a020"
|
||||
|
||||
@@ -9,7 +9,7 @@ package-mode = false
|
||||
python = "^3.10"
|
||||
pygments = "^2.18.0"
|
||||
redirects_cli ="^0.1.3"
|
||||
sphinx-scylladb-theme = "^1.8.10"
|
||||
sphinx-scylladb-theme = "^1.8.9"
|
||||
sphinx-sitemap = "^2.6.0"
|
||||
sphinx-autobuild = "^2024.4.19"
|
||||
Sphinx = "^7.3.7"
|
||||
|
||||
@@ -143,7 +143,6 @@ public:
|
||||
|
||||
gms::feature tablet_incremental_repair { *this, "TABLET_INCREMENTAL_REPAIR"sv };
|
||||
gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv };
|
||||
gms::feature tablet_repair_tasks_table { *this, "TABLET_REPAIR_TASKS_TABLE"sv };
|
||||
gms::feature tablet_merge { *this, "TABLET_MERGE"sv };
|
||||
gms::feature tablet_rack_aware_view_pairing { *this, "TABLET_RACK_AWARE_VIEW_PAIRING"sv };
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "tasks/task_handler.hh"
|
||||
#include "tasks/virtual_task_hint.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include <variant>
|
||||
#include "utils/overloaded_functor.hh"
|
||||
|
||||
@@ -90,6 +91,7 @@ future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status_help
|
||||
.scope = "cluster",
|
||||
.state = get_state(entry),
|
||||
.is_abortable = co_await is_abortable(std::move(hint)),
|
||||
.creation_time = db_clock::time_point(utils::UUID_gen::unix_timestamp(id.uuid())),
|
||||
.start_time = entry.start_time,
|
||||
.end_time = entry.end_time,
|
||||
.error = entry.error,
|
||||
@@ -167,6 +169,7 @@ future<std::vector<tasks::task_stats>> node_ops_virtual_task::get_stats() {
|
||||
.table = "",
|
||||
.entity = "",
|
||||
.shard = 0,
|
||||
.creation_time = db_clock::time_point(utils::UUID_gen::unix_timestamp(id)),
|
||||
.start_time = entry.start_time,
|
||||
.end_time = entry.end_time
|
||||
};
|
||||
|
||||
@@ -3844,83 +3844,3 @@ future<uint32_t> repair_service::get_next_repair_meta_id() {
|
||||
locator::host_id repair_service::my_host_id() const noexcept {
|
||||
return _gossiper.local().my_host_id();
|
||||
}
|
||||
|
||||
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2) {
|
||||
if (ranges1.empty() || ranges2.empty()) {
|
||||
co_return 0;
|
||||
}
|
||||
|
||||
auto sort = [] (utils::chunked_vector<tablet_token_range>& ranges) {
|
||||
std::sort(ranges.begin(), ranges.end(), [] (const auto& a, const auto& b) {
|
||||
if (a.first_token != b.first_token) {
|
||||
return a.first_token < b.first_token;
|
||||
}
|
||||
return a.last_token < b.last_token;
|
||||
});
|
||||
};
|
||||
|
||||
// First, merge overlapping and adjacent ranges in ranges2.
|
||||
sort(ranges2);
|
||||
utils::chunked_vector<tablet_token_range> merged;
|
||||
merged.push_back(ranges2[0]);
|
||||
for (size_t i = 1; i < ranges2.size(); ++i) {
|
||||
co_await coroutine::maybe_yield();
|
||||
// To avoid overflow with max() + 1, we check adjacency with `a - 1 <= b` instead of `a <= b + 1`
|
||||
if (ranges2[i].first_token - 1 <= merged.back().last_token) {
|
||||
merged.back().last_token = std::max(merged.back().last_token, ranges2[i].last_token);
|
||||
} else {
|
||||
merged.push_back(ranges2[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Count covered ranges using a linear scan
|
||||
size_t covered_count = 0;
|
||||
auto it = merged.begin();
|
||||
auto end = merged.end();
|
||||
sort(ranges1);
|
||||
for (const auto& r1 : ranges1) {
|
||||
co_await coroutine::maybe_yield();
|
||||
// Advance the merged iterator only if the current merged range ends
|
||||
// before the current r1 starts.
|
||||
while (it != end && it->last_token < r1.first_token) {
|
||||
co_await coroutine::maybe_yield();
|
||||
++it;
|
||||
}
|
||||
// If we have exhausted the merged ranges, no further r1 can be covered
|
||||
if (it == end) {
|
||||
break;
|
||||
}
|
||||
// Check if the current merged range covers r1.
|
||||
if (it->first_token <= r1.first_token && r1.last_token <= it->last_token) {
|
||||
covered_count++;
|
||||
}
|
||||
}
|
||||
|
||||
co_return covered_count;
|
||||
}
|
||||
|
||||
future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_task_progress(tasks::task_id task_uuid) {
|
||||
utils::chunked_vector<tablet_token_range> requested_tablets;
|
||||
utils::chunked_vector<tablet_token_range> finished_tablets;
|
||||
table_id tid;
|
||||
if (!_db.local().features().tablet_repair_tasks_table) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
co_await _sys_ks.local().get_repair_task(task_uuid, [&tid, &requested_tablets, &finished_tablets] (const db::system_keyspace::repair_task_entry& entry) -> future<> {
|
||||
rlogger.debug("repair_task_progress: Get entry operation={} first_token={} last_token={}", entry.operation, entry.first_token, entry.last_token);
|
||||
if (entry.operation == db::system_keyspace::repair_task_operation::requested) {
|
||||
requested_tablets.push_back({entry.first_token, entry.last_token});
|
||||
} else if (entry.operation == db::system_keyspace::repair_task_operation::finished) {
|
||||
finished_tablets.push_back({entry.first_token, entry.last_token});
|
||||
}
|
||||
tid = entry.table_uuid;
|
||||
co_return;
|
||||
});
|
||||
auto requested = requested_tablets.size();
|
||||
auto finished_nomerge = finished_tablets.size();
|
||||
auto finished = co_await count_finished_tablets(std::move(requested_tablets), std::move(finished_tablets));
|
||||
auto progress = repair_task_progress{requested, finished, tid};
|
||||
rlogger.debug("repair_task_progress: task_uuid={} table_uuid={} requested_tablets={} finished_tablets={} progress={} finished_nomerge={}",
|
||||
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
|
||||
co_return progress;
|
||||
}
|
||||
|
||||
@@ -99,15 +99,6 @@ public:
|
||||
|
||||
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
|
||||
|
||||
struct repair_task_progress {
|
||||
size_t requested;
|
||||
size_t finished;
|
||||
table_id table_uuid;
|
||||
float progress() const {
|
||||
return requested == 0 ? 1.0 : float(finished) / requested;
|
||||
}
|
||||
};
|
||||
|
||||
class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
sharded<service::topology_state_machine>& _tsm;
|
||||
sharded<gms::gossiper>& _gossiper;
|
||||
@@ -231,9 +222,6 @@ private:
|
||||
public:
|
||||
future<gc_clock::time_point> repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid, tasks::task_info global_tablet_repair_task_info, service::frozen_topology_guard topo_guard, std::optional<locator::tablet_replica_set> rebuild_replicas, locator::tablet_transition_stage stage);
|
||||
|
||||
|
||||
future<std::optional<repair_task_progress>> get_tablet_repair_task_progress(tasks::task_id task_uuid);
|
||||
|
||||
private:
|
||||
|
||||
future<repair_update_system_table_response> repair_update_system_table_handler(
|
||||
@@ -338,12 +326,3 @@ future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,
|
||||
schema_ptr s, uint64_t seed, repair_master is_master,
|
||||
reader_permit permit, repair_hasher hasher);
|
||||
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, std::optional<small_table_optimization_params> small_table_optimization = std::nullopt, repair_meta* rm = nullptr);
|
||||
|
||||
// A struct to hold the first and last token of a tablet.
|
||||
struct tablet_token_range {
|
||||
int64_t first_token;
|
||||
int64_t last_token;
|
||||
};
|
||||
|
||||
// Function to count the number of ranges in ranges1 covered by the merged ranges of ranges2.
|
||||
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2);
|
||||
|
||||
@@ -2810,26 +2810,26 @@ future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& shar
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, db::snapshot_options opts) {
|
||||
if (!opts.skip_flush) {
|
||||
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, bool skip_flush) {
|
||||
if (!skip_flush) {
|
||||
co_await flush_table_on_all_shards(sharded_db, uuid);
|
||||
}
|
||||
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
|
||||
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag, opts);
|
||||
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag);
|
||||
}
|
||||
|
||||
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts) {
|
||||
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), opts] (auto& table_name) {
|
||||
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush) {
|
||||
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), skip_flush] (auto& table_name) {
|
||||
auto uuid = sharded_db.local().find_uuid(ks_name, table_name);
|
||||
return snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
|
||||
return snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts) {
|
||||
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush) {
|
||||
auto& ks = sharded_db.local().find_keyspace(ks_name);
|
||||
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), opts] (const auto& pair) -> future<> {
|
||||
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), skip_flush] (const auto& pair) -> future<> {
|
||||
auto uuid = pair.second->id();
|
||||
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
|
||||
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2951,12 +2951,7 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
|
||||
auto truncated_at = truncated_at_opt.value_or(db_clock::now());
|
||||
auto name = snapshot_name_opt.value_or(
|
||||
format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name()));
|
||||
// Use the sstable identifier in snapshot names to allow de-duplication of sstables
|
||||
// at backup time even if they were migrated across shards or nodes and were renamed a given a new generation.
|
||||
// We hard-code that here since we have no way to pass this option to auto-snapshot and
|
||||
// it is always safe to use the sstable identifier for the sstable generation.
|
||||
auto opts = db::snapshot_options{.use_sstable_identifier = true};
|
||||
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name, opts);
|
||||
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name);
|
||||
}
|
||||
|
||||
co_await sharded_db.invoke_on_all([&] (database& db) {
|
||||
|
||||
@@ -1040,12 +1040,12 @@ public:
|
||||
private:
|
||||
using snapshot_file_set = foreign_ptr<std::unique_ptr<std::unordered_set<sstring>>>;
|
||||
|
||||
future<snapshot_file_set> take_snapshot(sstring jsondir, db::snapshot_options opts);
|
||||
future<snapshot_file_set> take_snapshot(sstring jsondir);
|
||||
// Writes the table schema and the manifest of all files in the snapshot directory.
|
||||
future<> finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector<snapshot_file_set> file_sets);
|
||||
static future<> seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets);
|
||||
public:
|
||||
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts);
|
||||
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name);
|
||||
|
||||
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
|
||||
static future<snapshot_details> get_snapshot_details(std::filesystem::path snapshot_dir, std::filesystem::path datadir);
|
||||
@@ -2009,9 +2009,9 @@ public:
|
||||
static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
|
||||
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
|
||||
|
||||
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, db::snapshot_options opts);
|
||||
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts);
|
||||
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts);
|
||||
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, bool skip_flush);
|
||||
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush);
|
||||
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush);
|
||||
|
||||
public:
|
||||
bool update_column_family(schema_ptr s);
|
||||
|
||||
@@ -3268,7 +3268,7 @@ future<> table::write_schema_as_cql(const global_table_ptr& table_shards, sstrin
|
||||
}
|
||||
|
||||
// Runs the orchestration code on an arbitrary shard to balance the load.
|
||||
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts) {
|
||||
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name) {
|
||||
auto* so = std::get_if<storage_options::local>(&table_shards->get_storage_options().value);
|
||||
if (so == nullptr) {
|
||||
throw std::runtime_error("Snapshotting non-local tables is not implemented");
|
||||
@@ -3291,7 +3291,7 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
|
||||
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
|
||||
co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> {
|
||||
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
|
||||
return table_shards->take_snapshot(jsondir, opts);
|
||||
return table_shards->take_snapshot(jsondir);
|
||||
}));
|
||||
});
|
||||
co_await io_check(sync_directory, jsondir);
|
||||
@@ -3300,22 +3300,19 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
|
||||
});
|
||||
}
|
||||
|
||||
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir, db::snapshot_options opts) {
|
||||
tlogger.trace("take_snapshot {}: use_sstable_identifier={}", jsondir, opts.use_sstable_identifier);
|
||||
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir) {
|
||||
tlogger.trace("take_snapshot {}", jsondir);
|
||||
|
||||
auto sstable_deletion_guard = co_await get_sstable_list_permit();
|
||||
|
||||
auto tables = *_sstables->all() | std::ranges::to<std::vector<sstables::shared_sstable>>();
|
||||
auto table_names = std::make_unique<std::unordered_set<sstring>>();
|
||||
|
||||
auto& ks_name = schema()->ks_name();
|
||||
auto& cf_name = schema()->cf_name();
|
||||
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&, opts] (sstables::shared_sstable sstable) -> future<> {
|
||||
auto gen = co_await io_check([sstable, &dir = jsondir, opts] {
|
||||
return sstable->snapshot(dir, opts.use_sstable_identifier);
|
||||
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) {
|
||||
table_names->insert(sstable->component_basename(sstables::component_type::Data));
|
||||
return io_check([sstable, &dir = jsondir] {
|
||||
return sstable->snapshot(dir);
|
||||
});
|
||||
auto fname = sstable->component_basename(ks_name, cf_name, sstable->get_version(), gen, sstable->get_format(), sstables::component_type::Data);
|
||||
table_names->insert(fname);
|
||||
});
|
||||
co_return make_foreign(std::move(table_names));
|
||||
}
|
||||
|
||||
238
scylla-gdb.py
238
scylla-gdb.py
@@ -267,246 +267,16 @@ class intrusive_set:
|
||||
|
||||
|
||||
class compact_radix_tree:
|
||||
"""Wrapper around compact_radix_tree::tree for GDB debugging.
|
||||
|
||||
Provides iteration and indexing by key (typically column_id) similar to std_map.
|
||||
The tree stores key-value pairs where keys are unsigned integers.
|
||||
|
||||
Example usage:
|
||||
tree = compact_radix_tree(row['_cells'])
|
||||
# Iterate over elements
|
||||
for key, value in tree:
|
||||
print(f"Column {key}: {value}")
|
||||
# Access by key
|
||||
cell = tree[column_id]
|
||||
# Check if key exists
|
||||
cell = tree.get(column_id, default=None)
|
||||
# Get all keys
|
||||
column_ids = tree.keys()
|
||||
|
||||
Note: Due to GDB limitations and compiler optimizations, full tree traversal
|
||||
is challenging. The implementation provides the std_map-like API but may not
|
||||
be able to extract all elements in optimized builds. In such cases, consider:
|
||||
- Using debug builds (-g -O0) for better introspection
|
||||
- Examining the tree structure directly with GDB commands
|
||||
- Using the C++ tree printer (compact_radix_tree::printer) in test code
|
||||
"""
|
||||
|
||||
def __init__(self, ref):
|
||||
"""Initialize from a gdb.Value representing a compact_radix_tree::tree instance."""
|
||||
self.ref = ref
|
||||
self.root = ref['_root']['_v']
|
||||
|
||||
# Get template arguments to determine key and value types
|
||||
tree_type = ref.type.strip_typedefs()
|
||||
self.value_type = tree_type.template_argument(0)
|
||||
# Index type defaults to unsigned int if not specified
|
||||
try:
|
||||
self.key_type = tree_type.template_argument(1)
|
||||
except RuntimeError:
|
||||
self.key_type = gdb.lookup_type('unsigned int')
|
||||
|
||||
# Cache for elements collected during traversal
|
||||
self._elements = None
|
||||
|
||||
# Constants from compact-radix-tree.hh
|
||||
# enum class layout : uint8_t { nil, indirect_tiny, indirect_small, ...}
|
||||
self.LAYOUT_NIL = 0
|
||||
self.RADIX_BITS = 7
|
||||
self.RADIX_MASK = (1 << self.RADIX_BITS) - 1
|
||||
|
||||
def is_empty(self):
|
||||
"""Check if the tree is empty."""
|
||||
try:
|
||||
layout = int(self.root['_base_layout'])
|
||||
return layout == self.LAYOUT_NIL
|
||||
except (gdb.error, gdb.MemoryError):
|
||||
return True
|
||||
|
||||
def _collect_elements(self):
|
||||
"""Collect all elements from the tree by traversing its structure.
|
||||
|
||||
Returns a list of (key, value) tuples sorted by key.
|
||||
This is cached after first call.
|
||||
"""
|
||||
if self._elements is not None:
|
||||
return self._elements
|
||||
|
||||
self._elements = []
|
||||
|
||||
if self.is_empty():
|
||||
return self._elements
|
||||
|
||||
try:
|
||||
# Traverse the tree structure
|
||||
# The tree is a radix tree with nodes that can be inner or leaf nodes
|
||||
# We'll do a depth-first traversal
|
||||
self._visit_node(self.root, 0, 0)
|
||||
|
||||
# Sort by key to ensure correct ordering
|
||||
self._elements.sort(key=lambda x: x[0])
|
||||
except (gdb.error, gdb.MemoryError) as e:
|
||||
# If traversal fails, we have at least collected what we could
|
||||
gdb.write(f"Warning: Failed to fully traverse compact_radix_tree: {e}\n")
|
||||
|
||||
return self._elements
|
||||
|
||||
def _visit_node(self, node, depth, prefix):
|
||||
"""Recursively visit a node and collect elements.
|
||||
|
||||
Args:
|
||||
node: The node_head to visit
|
||||
depth: Current depth in the tree
|
||||
prefix: Key prefix accumulated from parent nodes
|
||||
"""
|
||||
try:
|
||||
# Get node properties
|
||||
node_prefix = int(node['_prefix'])
|
||||
node_size = int(node['_size'])
|
||||
layout = int(node['_base_layout'])
|
||||
|
||||
if node_size == 0 or layout == self.LAYOUT_NIL:
|
||||
return
|
||||
|
||||
# Calculate the key size in bits
|
||||
# For uint32_t (column_id), this would be 32 bits
|
||||
key_bits = self.key_type.sizeof * 8
|
||||
|
||||
# Calculate leaf depth: the tree uses RADIX_BITS (7) bits per level
|
||||
# leaf_depth = ceil(key_bits / RADIX_BITS) - 1
|
||||
# The -1 accounts for the root level not being counted in depth
|
||||
leaf_depth = (key_bits + self.RADIX_BITS - 1) // self.RADIX_BITS - 1
|
||||
|
||||
# Extract prefix information from node_prefix
|
||||
# Prefix encoding: lower RADIX_BITS contain the prefix length,
|
||||
# upper bits contain the actual prefix value
|
||||
prefix_len = node_prefix & self.RADIX_MASK # Extract lower 7 bits for length
|
||||
prefix_value = node_prefix & ~self.RADIX_MASK # Extract upper bits for value
|
||||
|
||||
# Update prefix with node's contribution
|
||||
current_prefix = prefix | prefix_value
|
||||
|
||||
# Check if this is a leaf node (at maximum depth)
|
||||
if depth + prefix_len >= leaf_depth:
|
||||
# This is a leaf node - try to extract values
|
||||
self._collect_leaf_elements(node, current_prefix)
|
||||
else:
|
||||
# This is an inner node - recurse into children
|
||||
# Inner nodes contain pointers to other nodes
|
||||
# The structure is complex and varies by layout type
|
||||
# For now, we'll use a best-effort approach
|
||||
pass
|
||||
|
||||
except (gdb.error, gdb.MemoryError, ValueError) as e:
|
||||
# Skip nodes that can't be accessed
|
||||
pass
|
||||
|
||||
def _collect_leaf_elements(self, leaf_node, prefix):
|
||||
"""Collect elements from a leaf node.
|
||||
|
||||
Args:
|
||||
leaf_node: The leaf node_head
|
||||
prefix: Key prefix for elements in this leaf
|
||||
"""
|
||||
try:
|
||||
# Leaf nodes store the actual values
|
||||
# The exact structure depends on the layout type
|
||||
# Since the compiler may optimize away structure details,
|
||||
# we use a heuristic approach
|
||||
|
||||
# For now, we acknowledge that without full tree traversal support,
|
||||
# we can't reliably extract all elements
|
||||
# This would require implementing the full tree traversal logic
|
||||
# which is complex given GDB's limitations
|
||||
pass
|
||||
except (gdb.error, gdb.MemoryError):
|
||||
pass
|
||||
|
||||
def __len__(self):
|
||||
"""Return the number of elements in the tree."""
|
||||
elements = self._collect_elements()
|
||||
return len(elements)
|
||||
|
||||
def __iter__(self):
|
||||
"""Iterate over (key, value) pairs in the tree in ascending key order.
|
||||
|
||||
Yields:
|
||||
Tuples of (key, value) where key is the integer index and value is the stored element.
|
||||
"""
|
||||
elements = self._collect_elements()
|
||||
for key, value in elements:
|
||||
yield (key, value)
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""Get value at given key (column_id).
|
||||
|
||||
Args:
|
||||
key: Integer key (column_id) to look up
|
||||
|
||||
Returns:
|
||||
The value at the given key
|
||||
|
||||
Raises:
|
||||
KeyError: If key not found in tree
|
||||
"""
|
||||
elements = self._collect_elements()
|
||||
for k, v in elements:
|
||||
if k == key:
|
||||
return v
|
||||
raise KeyError(f"Key {key} not found in compact_radix_tree")
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""Get value at given key, or default if not found.
|
||||
|
||||
Args:
|
||||
key: Integer key to look up
|
||||
default: Value to return if key not found
|
||||
|
||||
Returns:
|
||||
The value at the given key, or default if not found
|
||||
"""
|
||||
try:
|
||||
return self[key]
|
||||
except KeyError:
|
||||
return default
|
||||
|
||||
def keys(self):
|
||||
"""Return a list of all keys in the tree."""
|
||||
elements = self._collect_elements()
|
||||
return [k for k, v in elements]
|
||||
|
||||
def values(self):
|
||||
"""Return a list of all values in the tree."""
|
||||
elements = self._collect_elements()
|
||||
return [v for k, v in elements]
|
||||
|
||||
def items(self):
|
||||
"""Return a list of (key, value) tuples."""
|
||||
return list(self._collect_elements())
|
||||
|
||||
def to_string(self):
|
||||
"""Return a string representation for printing."""
|
||||
if self.is_empty():
|
||||
if self.root['_base_layout'] == 0:
|
||||
return '<empty>'
|
||||
|
||||
# Try to provide more useful information
|
||||
try:
|
||||
elements = self._collect_elements()
|
||||
if elements:
|
||||
keys = [k for k, v in elements]
|
||||
return f'compact_radix_tree with {len(elements)} element(s), keys: {keys}'
|
||||
else:
|
||||
# We know it's not empty but couldn't collect elements
|
||||
# This happens when compiler optimizations prevent tree traversal
|
||||
try:
|
||||
size = int(self.root['_size'])
|
||||
layout = int(self.root['_base_layout'])
|
||||
return f'compact_radix_tree with size={size}, layout={layout} @ {hex(int(self.root.address))} (elements not accessible, use debug build for full introspection)'
|
||||
except (gdb.error, gdb.MemoryError, ValueError, AttributeError):
|
||||
return f'compact_radix_tree @ {hex(int(self.root.address))} (structure not fully accessible)'
|
||||
except (gdb.error, gdb.MemoryError, ValueError, AttributeError) as e:
|
||||
# Fallback to simple representation
|
||||
return f'compact_radix_tree @ {hex(int(self.root.address))} (error: {e})'
|
||||
# Compiler optimizes-away lots of critical stuff, so
|
||||
# for now just show where the tree is
|
||||
return 'compact radix tree @ 0x%x' % self.root
|
||||
|
||||
|
||||
class intrusive_btree:
|
||||
|
||||
@@ -1138,8 +1138,7 @@ private:
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
topology_request_tracking_mutation_builder trbuilder(global_request_id, _sp._features.topology_requests_type_column);
|
||||
trbuilder.set_truncate_table_data(table_id)
|
||||
.set("done", false)
|
||||
.set("start_time", db_clock::now());
|
||||
.set("done", false);
|
||||
|
||||
if (!_sp._features.topology_global_request_queue) {
|
||||
builder.set_global_topology_request(global_topology_request::truncate_table)
|
||||
|
||||
@@ -4940,7 +4940,6 @@ future<> storage_service::do_clusterwide_vnodes_cleanup() {
|
||||
builder.queue_global_topology_request_id(request_id);
|
||||
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", global_topology_request::cleanup);
|
||||
muts.push_back(rtbuilder.build());
|
||||
} else {
|
||||
@@ -5265,7 +5264,6 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
|
||||
builder.queue_global_topology_request_id(request_id);
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", global_topology_request::new_cdc_generation);
|
||||
muts.push_back(rtbuilder.build());
|
||||
} else {
|
||||
@@ -6822,7 +6820,6 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
|
||||
});
|
||||
}
|
||||
|
||||
auto ts = db_clock::now();
|
||||
for (const auto& token : tokens) {
|
||||
auto tid = tmap.get_tablet_id(token);
|
||||
auto& tinfo = tmap.get_tablet_info(tid);
|
||||
@@ -6836,20 +6833,6 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
|
||||
tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
|
||||
.set_repair_task_info(last_token, repair_task_info, _feature_service)
|
||||
.build());
|
||||
db::system_keyspace::repair_task_entry entry{
|
||||
.task_uuid = tasks::task_id(repair_task_info.tablet_task_id.uuid()),
|
||||
.operation = db::system_keyspace::repair_task_operation::requested,
|
||||
.first_token = dht::token::to_int64(tmap.get_first_token(tid)),
|
||||
.last_token = dht::token::to_int64(tmap.get_last_token(tid)),
|
||||
.timestamp = ts,
|
||||
.table_uuid = table,
|
||||
};
|
||||
if (_feature_service.tablet_repair_tasks_table) {
|
||||
auto cmuts = co_await _sys_ks.local().get_update_repair_task_mutations(entry, guard.write_timestamp());
|
||||
for (auto& m : cmuts) {
|
||||
updates.push_back(std::move(m));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sstring reason = format("Repair tablet by API request tokens={} tablet_task_id={}", tokens, repair_task_info.tablet_task_id);
|
||||
|
||||
@@ -136,17 +136,6 @@ db::tablet_options combine_tablet_options(R&& opts) {
|
||||
return combined_opts;
|
||||
}
|
||||
|
||||
static std::unordered_set<locator::tablet_id> split_string_to_tablet_id(std::string_view s, char delimiter) {
|
||||
auto tokens_view = s | std::views::split(delimiter)
|
||||
| std::views::transform([](auto&& range) {
|
||||
return std::string_view(&*range.begin(), std::ranges::distance(range));
|
||||
})
|
||||
| std::views::transform([](std::string_view sv) {
|
||||
return locator::tablet_id(std::stoul(std::string(sv)));
|
||||
});
|
||||
return std::unordered_set<locator::tablet_id>{tokens_view.begin(), tokens_view.end()};
|
||||
}
|
||||
|
||||
// Used to compare different migration choices in regard to impact on load imbalance.
|
||||
// There is a total order on migration_badness such that better migrations are ordered before worse ones.
|
||||
struct migration_badness {
|
||||
@@ -904,8 +893,6 @@ public:
|
||||
co_await coroutine::maybe_yield();
|
||||
auto& config = tmap.repair_scheduler_config();
|
||||
auto now = db_clock::now();
|
||||
auto skip = utils::get_local_injector().inject_parameter<std::string_view>("tablet_repair_skip_sched");
|
||||
auto skip_tablets = skip ? split_string_to_tablet_id(*skip, ',') : std::unordered_set<locator::tablet_id>();
|
||||
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
|
||||
auto gid = locator::global_tablet_id{table, id};
|
||||
// Skip tablet that is in transitions.
|
||||
@@ -926,11 +913,6 @@ public:
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (skip_tablets.contains(id)) {
|
||||
lblogger.debug("Skipped tablet repair for tablet={} by error injector", gid);
|
||||
co_return;
|
||||
}
|
||||
|
||||
// Avoid rescheduling a failed tablet repair in a loop
|
||||
// TODO: Allow user to config
|
||||
const auto min_reschedule_time = std::chrono::seconds(5);
|
||||
|
||||
@@ -10,10 +10,10 @@
|
||||
#include "replica/database.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "repair/row_level.hh"
|
||||
#include "service/task_manager_module.hh"
|
||||
#include "tasks/task_handler.hh"
|
||||
#include "tasks/virtual_task_hint.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
|
||||
namespace service {
|
||||
@@ -58,9 +58,14 @@ static std::optional<tasks::task_stats> maybe_make_task_stats(const locator::tab
|
||||
.kind = tasks::task_kind::cluster,
|
||||
.scope = get_scope(task_info.request_type),
|
||||
.state = tasks::task_manager::task_state::running,
|
||||
.sequence_number = 0,
|
||||
.keyspace = schema->ks_name(),
|
||||
.table = schema->cf_name(),
|
||||
.start_time = task_info.request_time
|
||||
.entity = "",
|
||||
.shard = 0,
|
||||
.creation_time = task_info.request_time,
|
||||
.start_time = task_info.sched_time,
|
||||
.end_time = db_clock::time_point{}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -110,16 +115,6 @@ future<std::optional<tasks::virtual_task_hint>> tablet_virtual_task::contains(ta
|
||||
tid = tmap.next_tablet(*tid);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the task id is present in the repair task table
|
||||
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(task_id);
|
||||
if (progress && progress->requested > 0) {
|
||||
co_return tasks::virtual_task_hint{
|
||||
.table_id = progress->table_uuid,
|
||||
.task_type = locator::tablet_task_type::user_repair,
|
||||
.tablet_id = std::nullopt,
|
||||
};
|
||||
}
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
@@ -236,7 +231,8 @@ static void update_status(const locator::tablet_task_info& task_info, tasks::tas
|
||||
sched_nr += task_info.sched_nr;
|
||||
status.type = locator::tablet_task_type_to_string(task_info.request_type);
|
||||
status.scope = get_scope(task_info.request_type);
|
||||
status.start_time = task_info.request_time;
|
||||
status.creation_time = task_info.request_time;
|
||||
status.start_time = task_info.sched_time;
|
||||
}
|
||||
|
||||
future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(tasks::task_id id, tasks::virtual_task_hint hint) {
|
||||
@@ -254,20 +250,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
|
||||
size_t sched_nr = 0;
|
||||
auto tmptr = _ss.get_token_metadata_ptr();
|
||||
auto& tmap = tmptr->tablets().get_tablet_map(table);
|
||||
bool repair_task_finished = false;
|
||||
bool repair_task_pending = false;
|
||||
if (is_repair_task(task_type)) {
|
||||
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(id);
|
||||
if (progress) {
|
||||
res.status.progress.completed = progress->finished;
|
||||
res.status.progress.total = progress->requested;
|
||||
res.status.progress_units = "tablets";
|
||||
if (progress->requested > 0 && progress->requested == progress->finished) {
|
||||
repair_task_finished = true;
|
||||
} if (progress->requested > 0 && progress->requested > progress->finished) {
|
||||
repair_task_pending = true;
|
||||
}
|
||||
}
|
||||
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) {
|
||||
auto& task_info = info.repair_task_info;
|
||||
if (task_info.tablet_task_id.uuid() == id.uuid()) {
|
||||
@@ -299,17 +282,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
|
||||
res.status.state = sched_nr == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running;
|
||||
co_return res;
|
||||
}
|
||||
|
||||
if (repair_task_pending) {
|
||||
// When repair_task_pending is true, the res.tablets will be empty iff the request is aborted by user.
|
||||
res.status.state = res.tablets.empty() ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::running;
|
||||
co_return res;
|
||||
}
|
||||
if (repair_task_finished) {
|
||||
res.status.state = tasks::task_manager::task_state::done;
|
||||
co_return res;
|
||||
}
|
||||
|
||||
// FIXME: Show finished tasks.
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
|
||||
@@ -956,6 +956,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
req_entry = co_await _sys_ks.get_topology_request_entry(req_id, true);
|
||||
req = std::get<global_topology_request>(req_entry.request_type);
|
||||
}
|
||||
|
||||
switch (req) {
|
||||
case global_topology_request::new_cdc_generation: {
|
||||
rtlogger.info("new CDC generation requested");
|
||||
@@ -975,9 +976,14 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.set_global_topology_request(req)
|
||||
.set_global_topology_request_id(req_id)
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id);
|
||||
|
||||
// Set start_time when we begin executing the request
|
||||
topology_request_tracking_mutation_builder rtbuilder(req_id);
|
||||
rtbuilder.set("start_time", db_clock::now());
|
||||
|
||||
auto reason = ::format(
|
||||
"insert CDC generation data (UUID: {})", gen_uuid);
|
||||
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
|
||||
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build(), rtbuilder.build()}, reason);
|
||||
}
|
||||
break;
|
||||
case global_topology_request::cleanup:
|
||||
@@ -1068,7 +1074,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.del_global_topology_request_id()
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
|
||||
.build()));
|
||||
// Set start_time when we begin executing the request and mark as done
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
|
||||
.set("start_time", db_clock::now())
|
||||
.done(error)
|
||||
.build()));
|
||||
|
||||
@@ -1088,7 +1096,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.set_global_topology_request_id(req_id)
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
|
||||
.set_session(session_id(req_id));
|
||||
co_await update_topology_state(std::move(guard), {builder.build()}, "TRUNCATE TABLE requested");
|
||||
|
||||
// Set start_time when we begin executing the request
|
||||
topology_request_tracking_mutation_builder rtbuilder(req_id);
|
||||
rtbuilder.set("start_time", db_clock::now());
|
||||
|
||||
co_await update_topology_state(std::move(guard), {builder.build(), rtbuilder.build()}, "TRUNCATE TABLE requested");
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -1205,8 +1218,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
|
||||
// Record the repair_time returned by the repair_tablet rpc call
|
||||
db_clock::time_point repair_time;
|
||||
// Record the repair task update muations
|
||||
utils::chunked_vector<canonical_mutation> repair_task_updates;
|
||||
service::session_id session_id;
|
||||
};
|
||||
|
||||
@@ -1739,14 +1750,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
dst = dst_opt.value().host;
|
||||
}
|
||||
// Update repair task
|
||||
db::system_keyspace::repair_task_entry entry{
|
||||
.task_uuid = tasks::task_id(tinfo.repair_task_info.tablet_task_id.uuid()),
|
||||
.operation = db::system_keyspace::repair_task_operation::finished,
|
||||
.first_token = dht::token::to_int64(tmap.get_first_token(gid.tablet)),
|
||||
.last_token = dht::token::to_int64(tmap.get_last_token(gid.tablet)),
|
||||
.table_uuid = gid.table,
|
||||
};
|
||||
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
|
||||
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
|
||||
service::session_id::create_random_id() : trinfo->session_id;
|
||||
@@ -1755,10 +1758,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
|
||||
auto& tablet_state = _tablets[tablet];
|
||||
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
|
||||
if (_feature_service.tablet_repair_tasks_table) {
|
||||
entry.timestamp = db_clock::now();
|
||||
tablet_state.repair_task_updates = co_await _sys_ks.get_update_repair_task_mutations(entry, api::new_timestamp());
|
||||
}
|
||||
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={}",
|
||||
dst, tablet, duration, res.repair_time);
|
||||
})) {
|
||||
@@ -1777,9 +1776,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
|
||||
.del_repair_task_info(last_token, _feature_service)
|
||||
.del_session(last_token);
|
||||
for (auto& m : tablet_state.repair_task_updates) {
|
||||
updates.push_back(std::move(m));
|
||||
}
|
||||
// Skip update repair time in case hosts filter or dcs filter is set.
|
||||
if (valid && is_filter_off) {
|
||||
auto sched_time = tinfo.repair_task_info.sched_time;
|
||||
@@ -3296,6 +3292,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.del_global_topology_request();
|
||||
if (_feature_service.topology_global_request_queue) {
|
||||
// Set start_time when we begin executing the request
|
||||
topology_request_tracking_mutation_builder start_rtbuilder(*global_request_id);
|
||||
start_rtbuilder.set("start_time", db_clock::now());
|
||||
muts.emplace_back(start_rtbuilder.build());
|
||||
|
||||
topology_request_tracking_mutation_builder rtbuilder(*global_request_id);
|
||||
builder.del_global_topology_request_id()
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, *global_request_id);
|
||||
|
||||
@@ -2117,14 +2117,11 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
|
||||
}
|
||||
|
||||
sstable_id sid;
|
||||
// Force a random sstable_id for testing purposes
|
||||
bool random_sstable_identifier = utils::get_local_injector().is_enabled("random_sstable_identifier");
|
||||
if (!random_sstable_identifier && generation().is_uuid_based()) {
|
||||
if (generation().is_uuid_based()) {
|
||||
sid = sstable_id(generation().as_uuid());
|
||||
} else {
|
||||
sid = sstable_id(utils::UUID_gen::get_time_UUID());
|
||||
auto msg = random_sstable_identifier ? "forced random sstable_id" : "has numerical generation";
|
||||
sstlog.info("SSTable {} {}. SSTable identifier in scylla_metadata set to {}", get_filename(), msg, sid);
|
||||
sstlog.info("SSTable {} has numerical generation. SSTable identifier in scylla_metadata set to {}", get_filename(), sid);
|
||||
}
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::SSTableIdentifier>(scylla_metadata::sstable_identifier{sid});
|
||||
|
||||
@@ -2543,11 +2540,8 @@ std::vector<std::pair<component_type, sstring>> sstable::all_components() const
|
||||
return all;
|
||||
}
|
||||
|
||||
future<generation_type> sstable::snapshot(const sstring& dir, bool use_sstable_identifier) const {
|
||||
// Use the sstable identifier UUID if available to enable global de-duplication of sstables in backup.
|
||||
generation_type gen = (use_sstable_identifier && _sstable_identifier) ? generation_type(_sstable_identifier->uuid()) : _generation;
|
||||
co_await _storage->snapshot(*this, dir, storage::absolute_path::yes, gen);
|
||||
co_return gen;
|
||||
future<> sstable::snapshot(const sstring& dir) const {
|
||||
return _storage->snapshot(*this, dir, storage::absolute_path::yes);
|
||||
}
|
||||
|
||||
future<> sstable::change_state(sstable_state to, delayed_commit_changes* delay_commit) {
|
||||
|
||||
@@ -397,10 +397,6 @@ public:
|
||||
return _version;
|
||||
}
|
||||
|
||||
format_types get_format() const {
|
||||
return _format;
|
||||
}
|
||||
|
||||
// Returns the total bytes of all components.
|
||||
uint64_t bytes_on_disk() const;
|
||||
file_size_stats get_file_size_stats() const;
|
||||
@@ -442,10 +438,7 @@ public:
|
||||
|
||||
std::vector<std::pair<component_type, sstring>> all_components() const;
|
||||
|
||||
// When use_sstable_identifier is true and the sstable identifier is available,
|
||||
// use it to name the sstable in the snapshot, rather than the sstable generation.
|
||||
// Returns the generation used for snapshot.
|
||||
future<generation_type> snapshot(const sstring& dir, bool use_sstable_identifier = false) const;
|
||||
future<> snapshot(const sstring& dir) const;
|
||||
|
||||
// Delete the sstable by unlinking all sstable files
|
||||
// Ignores all errors.
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "tasks/task_handler.hh"
|
||||
#include "tasks/virtual_task_hint.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
|
||||
#include <seastar/core/with_timeout.hh>
|
||||
|
||||
@@ -19,6 +20,11 @@ namespace tasks {
|
||||
|
||||
using task_status_variant = std::variant<tasks::task_manager::foreign_task_ptr, tasks::task_manager::task::task_essentials>;
|
||||
|
||||
static db_clock::time_point get_creation_time_from_task_id(task_id id) {
|
||||
// Task IDs are timeuuids (version 1 UUIDs), so we can extract the timestamp from them
|
||||
return db_clock::time_point(utils::UUID_gen::unix_timestamp(id.uuid()));
|
||||
}
|
||||
|
||||
static future<task_status> get_task_status(task_manager::task_ptr task) {
|
||||
auto host_id = task->get_module()->get_task_manager().get_host_id();
|
||||
auto local_task_status = task->get_status();
|
||||
@@ -29,6 +35,7 @@ static future<task_status> get_task_status(task_manager::task_ptr task) {
|
||||
.scope = local_task_status.scope,
|
||||
.state = local_task_status.state,
|
||||
.is_abortable = task->is_abortable(),
|
||||
.creation_time = get_creation_time_from_task_id(local_task_status.id),
|
||||
.start_time = local_task_status.start_time,
|
||||
.end_time = local_task_status.end_time,
|
||||
.error = local_task_status.error,
|
||||
@@ -173,6 +180,7 @@ future<utils::chunked_vector<task_status>> task_handler::get_status_recursively(
|
||||
.scope = task.task_status.scope,
|
||||
.state = task.task_status.state,
|
||||
.is_abortable = task.abortable,
|
||||
.creation_time = get_creation_time_from_task_id(task.task_status.id),
|
||||
.start_time = task.task_status.start_time,
|
||||
.end_time = task.task_status.end_time,
|
||||
.error = task.task_status.error,
|
||||
|
||||
@@ -26,6 +26,7 @@ struct task_status {
|
||||
std::string scope;
|
||||
task_manager::task_state state;
|
||||
is_abortable is_abortable;
|
||||
db_clock::time_point creation_time;
|
||||
db_clock::time_point start_time;
|
||||
db_clock::time_point end_time;
|
||||
std::string error;
|
||||
@@ -51,6 +52,7 @@ struct task_stats {
|
||||
std::string table;
|
||||
std::string entity;
|
||||
unsigned shard;
|
||||
db_clock::time_point creation_time;
|
||||
db_clock::time_point start_time;
|
||||
db_clock::time_point end_time;
|
||||
};
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "tasks/task_handler.hh"
|
||||
#include "task_manager.hh"
|
||||
@@ -559,6 +560,7 @@ future<utils::chunked_vector<task_stats>> task_manager::module::get_stats(is_int
|
||||
.table = task->get_status().table,
|
||||
.entity = task->get_status().entity,
|
||||
.shard = task->get_status().shard,
|
||||
.creation_time = db_clock::time_point(utils::UUID_gen::unix_timestamp(task->id().uuid())),
|
||||
.start_time = task->get_status().start_time,
|
||||
.end_time = task->get_status().end_time,
|
||||
});
|
||||
|
||||
@@ -31,7 +31,6 @@
|
||||
#include "replica/database.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/lister.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "test/lib/mutation_source_test.hh"
|
||||
@@ -39,7 +38,6 @@
|
||||
#include "service/migration_manager.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/generation_type.hh"
|
||||
#include "sstables/sstable_version.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
@@ -53,7 +51,6 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "replica/mutation_dump.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
using namespace sstables;
|
||||
@@ -615,13 +612,13 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
|
||||
});
|
||||
}
|
||||
|
||||
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) {
|
||||
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", bool skip_flush = false) {
|
||||
try {
|
||||
auto uuid = e.db().local().find_uuid(ks_name, cf_name);
|
||||
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts);
|
||||
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, skip_flush);
|
||||
} catch (...) {
|
||||
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={} use_sstable_identifier={}: {}",
|
||||
ks_name, cf_name, snapshot_name, opts.skip_flush, opts.use_sstable_identifier, std::current_exception());
|
||||
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
|
||||
ks_name, cf_name, snapshot_name, skip_flush, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
@@ -635,37 +632,6 @@ future<std::set<sstring>> collect_files(fs::path path) {
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
static bool is_component(const sstring& fname, const sstring& suffix) {
|
||||
return fname.ends_with(suffix);
|
||||
}
|
||||
|
||||
static std::set<sstring> collect_sstables(const std::set<sstring>& all_files, const sstring& suffix) {
|
||||
// Verify manifest against the files in the snapshots dir
|
||||
auto pred = [&suffix] (const sstring& fname) {
|
||||
return is_component(fname, suffix);
|
||||
};
|
||||
return std::ranges::filter_view(all_files, pred) | std::ranges::to<std::set<sstring>>();
|
||||
}
|
||||
|
||||
// Validate that the manifest.json lists exactly the SSTables present in the snapshot directory
|
||||
static future<> validate_manifest(const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir) {
|
||||
sstring suffix = "-Data.db";
|
||||
auto sstables_in_snapshot = collect_sstables(in_snapshot_dir, suffix);
|
||||
|
||||
std::set<sstring> sstables_in_manifest;
|
||||
auto manifest_str = co_await util::read_entire_file_contiguous(snapshot_dir / "manifest.json");
|
||||
auto manifest_json = rjson::parse(manifest_str);
|
||||
auto& manifest_files = manifest_json["files"];
|
||||
BOOST_REQUIRE(manifest_files.IsArray());
|
||||
for (auto& f : manifest_files.GetArray()) {
|
||||
if (is_component(f.GetString(), suffix)) {
|
||||
sstables_in_manifest.insert(f.GetString());
|
||||
}
|
||||
}
|
||||
testlog.debug("SSTables in manifest.json: {}", fmt::join(sstables_in_manifest, ", "));
|
||||
BOOST_REQUIRE_EQUAL(sstables_in_snapshot, sstables_in_manifest);
|
||||
}
|
||||
|
||||
static future<> snapshot_works(const std::string& table_name) {
|
||||
return do_with_some_data({"cf"}, [table_name] (cql_test_env& e) {
|
||||
take_snapshot(e, "ks", table_name).get();
|
||||
@@ -685,8 +651,6 @@ static future<> snapshot_works(const std::string& table_name) {
|
||||
// all files were copied and manifest was generated
|
||||
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
|
||||
|
||||
validate_manifest(snapshot_dir, in_snapshot_dir).get();
|
||||
|
||||
return make_ready_future<>();
|
||||
}, true);
|
||||
}
|
||||
@@ -705,8 +669,7 @@ SEASTAR_TEST_CASE(index_snapshot_works) {
|
||||
|
||||
SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
|
||||
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
|
||||
db::snapshot_options opts = {.skip_flush = true};
|
||||
take_snapshot(e, "ks", "cf", "test", opts).get();
|
||||
take_snapshot(e, "ks", "cf", "test", true /* skip_flush */).get();
|
||||
|
||||
auto& cf = e.local_db().find_column_family("ks", "cf");
|
||||
|
||||
@@ -719,41 +682,6 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(snapshot_use_sstable_identifier_works) {
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
||||
return make_ready_future<>();
|
||||
#endif
|
||||
sstring table_name = "cf";
|
||||
// Force random sstable identifiers, otherwise the initial sstable_id is equal
|
||||
// to the sstable generation and the test can't distinguish between them.
|
||||
utils::get_local_injector().enable("random_sstable_identifier", false);
|
||||
return do_with_some_data({table_name}, [table_name] (cql_test_env& e) -> future<> {
|
||||
sstring tag = "test";
|
||||
db::snapshot_options opts = {.use_sstable_identifier = true};
|
||||
co_await take_snapshot(e, "ks", table_name, tag, opts);
|
||||
|
||||
auto& cf = e.local_db().find_column_family("ks", table_name);
|
||||
auto table_directory = table_dir(cf);
|
||||
auto snapshot_dir = table_directory / sstables::snapshots_dir / tag;
|
||||
auto in_table_dir = co_await collect_files(table_directory);
|
||||
// snapshot triggered a flush and wrote the data down.
|
||||
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
|
||||
testlog.info("Files in table dir: {}", fmt::join(in_table_dir, ", "));
|
||||
|
||||
auto in_snapshot_dir = co_await collect_files(snapshot_dir);
|
||||
testlog.info("Files in snapshot dir: {}", fmt::join(in_snapshot_dir, ", "));
|
||||
|
||||
in_table_dir.insert("manifest.json");
|
||||
in_table_dir.insert("schema.cql");
|
||||
// all files were copied and manifest was generated
|
||||
BOOST_REQUIRE_EQUAL(in_table_dir.size(), in_snapshot_dir.size());
|
||||
BOOST_REQUIRE_NE(in_table_dir, in_snapshot_dir);
|
||||
|
||||
co_await validate_manifest(snapshot_dir, in_snapshot_dir);
|
||||
}, true);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(snapshot_list_okay) {
|
||||
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
|
||||
auto& cf = e.local_db().find_column_family("ks", "cf");
|
||||
@@ -1528,7 +1456,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
|
||||
}
|
||||
BOOST_REQUIRE(found);
|
||||
|
||||
co_await take_snapshot(e, "ks", "cf", "test", db::snapshot_options{.skip_flush = true});
|
||||
co_await take_snapshot(e, "ks", "cf", "test", true /* skip_flush */);
|
||||
|
||||
testlog.debug("Expected: {}", expected);
|
||||
|
||||
|
||||
@@ -346,60 +346,4 @@ SEASTAR_TEST_CASE(repair_rows_size_considers_external_memory) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_tablet_token_range_count) {
|
||||
{
|
||||
// Simple case: one large range covers a smaller one
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
|
||||
}
|
||||
{
|
||||
// r2 ranges overlap and should merge to cover r1
|
||||
// r2: [0, 50] + [40, 100] -> merges to [0, 100]
|
||||
// r1: [10, 90] should be covered
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{10, 90}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 50}, {40, 100}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
|
||||
}
|
||||
{
|
||||
// r2 ranges are adjacent (contiguous) and should merge
|
||||
// r2: [0, 10] + [11, 20] -> merges to [0, 20]
|
||||
// r1: [5, 15] should be covered
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}, {11, 20}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
|
||||
}
|
||||
{
|
||||
// r1 overlaps r2 but is not FULLY contained
|
||||
// r2: [0, 10]
|
||||
// r1: [5, 15] (Ends too late), [ -5, 5 ] (Starts too early)
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}, {-5, 5}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 0);
|
||||
}
|
||||
{
|
||||
// A single merged range in r2 covers multiple distinct ranges in r1
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}, {30, 40}, {50, 60}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 3);
|
||||
}
|
||||
{
|
||||
// Inputs are provided in random order, ensuring the internal sort works
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{50, 60}, {10, 20}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{50, 100}, {0, 40}};
|
||||
// r2 merges effectively to [0, 40] and [50, 100]
|
||||
// Both r1 items are covered
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 2);
|
||||
}
|
||||
{
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
|
||||
utils::chunked_vector<tablet_token_range> r2_empty = {};
|
||||
utils::chunked_vector<tablet_token_range> r1_empty = {};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
|
||||
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2_empty) == 0);
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1_empty, r2) == 0);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -16,26 +16,16 @@ from test.cluster.util import get_topology_coordinator, new_test_keyspace, recon
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# This test makes sure that view building is done mainly in the streaming
|
||||
# scheduling group. We check that by grepping all relevant logs in TRACE mode
|
||||
# and verifying that they come from the streaming scheduling group.
|
||||
#
|
||||
# For more context, see: https://github.com/scylladb/scylladb/issues/21232.
|
||||
# This test reproduces the issue in non-tablet mode.
|
||||
# This test makes sure that view building is done mainly in the streaming scheduling group
|
||||
# and not the gossip scheduling group. We do that by measuring the time each group was
|
||||
# busy during the view building process and confirming that the gossip group was busy
|
||||
# much less than the streaming group.
|
||||
# Reproduces https://github.com/scylladb/scylladb/issues/21232
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('debug', 'the test needs to do some work which takes too much time in debug mode')
|
||||
async def test_view_building_scheduling_group(manager: ManagerClient):
|
||||
# Note: The view building coordinator works in the gossiping scheduling group,
|
||||
# and we intentionally omit it here.
|
||||
# Note: We include "view" for keyspaces that don't use the view building coordinator
|
||||
# and will follow the legacy path instead.
|
||||
loggers = ["view_building_worker", "view_consumer", "view_update_generator", "view"]
|
||||
# Flatten the list of lists.
|
||||
cmdline = sum([["--logger-log-level", f"{logger}=trace"] for logger in loggers], [])
|
||||
|
||||
server = await manager.server_add(cmdline=cmdline)
|
||||
server = await manager.server_add()
|
||||
cql = manager.get_cql()
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (p int, c int, PRIMARY KEY (p, c))")
|
||||
|
||||
@@ -45,30 +35,21 @@ async def test_view_building_scheduling_group(manager: ManagerClient):
|
||||
batch = "BEGIN UNLOGGED BATCH\n" + "\n".join(inserts) + "\nAPPLY BATCH\n"
|
||||
await manager.cql.run_async(batch)
|
||||
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
mark = await log.mark()
|
||||
metrics_before = await manager.metrics.query(server.ip_addr)
|
||||
ms_gossip_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
|
||||
ms_streaming_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
|
||||
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv AS SELECT p, c FROM {ks}.tab WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p)")
|
||||
await wait_for_view(cql, 'mv', 1)
|
||||
|
||||
logger_alternative = "|".join(loggers)
|
||||
pattern = rf"\[shard [0-9]+:(.+)\] ({logger_alternative}) - "
|
||||
|
||||
results = await log.grep(pattern, from_mark=mark)
|
||||
# Sanity check. If there are no logs, something's wrong.
|
||||
assert len(results) > 0
|
||||
|
||||
# In case of non-tablet keyspaces, we won't use the view building coordinator.
|
||||
# Instead, view updates will follow the legacy path. Along the way, we'll observe
|
||||
# this message, which will be printed using another scheduling group, so let's
|
||||
# filter it out.
|
||||
predicate = lambda result: f"Building view {ks}.mv, starting at token" not in result[0]
|
||||
results = list(filter(predicate, results))
|
||||
|
||||
# Take the first parenthesized match for each result, i.e. the scheduling group.
|
||||
sched_groups = [matches[1] for _, matches in results]
|
||||
|
||||
assert all(sched_group == "strm" for sched_group in sched_groups)
|
||||
metrics_after = await manager.metrics.query(server.ip_addr)
|
||||
ms_gossip_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
|
||||
ms_streaming_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
|
||||
ms_streaming = ms_streaming_after - ms_streaming_before
|
||||
ms_statement = ms_gossip_after - ms_gossip_before
|
||||
ratio = ms_statement / ms_streaming
|
||||
print(f"ms_streaming: {ms_streaming}, ms_statement: {ms_statement}, ratio: {ratio}")
|
||||
assert ratio < 0.1
|
||||
|
||||
# A sanity check test ensures that starting and shutting down Scylla when view building is
|
||||
# disabled is conducted properly and we don't run into any issues.
|
||||
|
||||
@@ -38,6 +38,7 @@ class TaskStats(NamedTuple):
|
||||
entity: str
|
||||
sequence_number: SequenceNum
|
||||
shard: int
|
||||
creation_time: str
|
||||
start_time: str
|
||||
end_time: str
|
||||
|
||||
@@ -54,6 +55,7 @@ class TaskStatus(NamedTuple):
|
||||
entity: str
|
||||
sequence_number: SequenceNum
|
||||
is_abortable: bool
|
||||
creation_time: str
|
||||
start_time: str
|
||||
end_time: str
|
||||
error: str
|
||||
|
||||
@@ -43,86 +43,6 @@ async def guarantee_repair_time_next_second():
|
||||
# different than the previous one.
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def do_test_tablet_repair_progress_split_merge(manager: ManagerClient, do_split=False, do_merge=False):
|
||||
nr_tablets = 16
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=True, tablets=nr_tablets)
|
||||
token = 'all'
|
||||
logs = []
|
||||
for s in servers:
|
||||
logs.append(await manager.server_open_log(s.server_id))
|
||||
|
||||
# Skip repair for the listed tablet id
|
||||
nr_tablets_skipped = 4
|
||||
nr_tablets_repaired = nr_tablets - nr_tablets_skipped
|
||||
await inject_error_on(manager, "tablet_repair_skip_sched", servers, params={'value':"0,1,5,8"})
|
||||
|
||||
# Request to repair all tablets
|
||||
repair_res = await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
|
||||
logging.info(f'{repair_res=}')
|
||||
tablet_task_id = repair_res['tablet_task_id']
|
||||
|
||||
async def get_task_status(desc):
|
||||
task_status = await manager.api.get_task_status(servers[0].ip_addr, tablet_task_id)
|
||||
completed = int(task_status['progress_completed'])
|
||||
total = int(task_status['progress_total'])
|
||||
logging.info(f'{desc=} {completed=} {total=} {task_status=}')
|
||||
return completed, total
|
||||
|
||||
async def wait_task_progress(wanted_complete, wanted_total):
|
||||
while True:
|
||||
completed, total = await get_task_status("wait_task_progress")
|
||||
if completed == wanted_complete and total == wanted_total:
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def get_task_status_and_check(desc):
|
||||
completed, total = await get_task_status(desc)
|
||||
assert completed == nr_tablets_repaired
|
||||
assert total == nr_tablets
|
||||
|
||||
# 12 out of 16 tablets should finish
|
||||
await wait_task_progress(nr_tablets_repaired, nr_tablets)
|
||||
|
||||
if do_split:
|
||||
await get_task_status_and_check("before_split")
|
||||
|
||||
s1_mark = await logs[0].mark()
|
||||
await inject_error_on(manager, "tablet_force_tablet_count_increase", servers)
|
||||
await logs[0].wait_for('Detected tablet split for table', from_mark=s1_mark)
|
||||
await inject_error_off(manager, "tablet_force_tablet_count_increase", servers)
|
||||
|
||||
await get_task_status_and_check("after_split")
|
||||
|
||||
if do_merge:
|
||||
await get_task_status_and_check("before_merge")
|
||||
|
||||
s1_mark = await logs[0].mark()
|
||||
await inject_error_on(manager, "tablet_force_tablet_count_decrease", servers)
|
||||
await logs[0].wait_for('Detected tablet merge for table', from_mark=s1_mark)
|
||||
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
|
||||
|
||||
await get_task_status_and_check("after_merge")
|
||||
|
||||
# Wait for all repair to finish after all tablets can be scheduled to run repair
|
||||
await inject_error_off(manager, "tablet_repair_skip_sched", servers)
|
||||
await wait_task_progress(nr_tablets, nr_tablets)
|
||||
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_progress(manager: ManagerClient):
|
||||
await do_test_tablet_repair_progress_split_merge(manager, do_split=False, do_merge=False)
|
||||
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_progress_split(manager: ManagerClient):
|
||||
await do_test_tablet_repair_progress_split_merge(manager, do_split=True)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip(reason="https://github.com/scylladb/scylladb/issues/26844")
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_repair_progress_merge(manager: ManagerClient):
|
||||
await do_test_tablet_repair_progress_split_merge(manager, do_merge=True)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_manual_repair(manager: ManagerClient):
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)
|
||||
|
||||
@@ -4,8 +4,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
from typing import Any
|
||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||
from cassandra.policies import FallthroughRetryPolicy
|
||||
from cassandra.query import ConsistencyLevel
|
||||
|
||||
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
@@ -1597,7 +1596,7 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
|
||||
async def truncate_table():
|
||||
await asyncio.sleep(10)
|
||||
logger.info("Executing truncate during bootstrap")
|
||||
await cql.run_async(SimpleStatement(f"TRUNCATE {ks}.test USING TIMEOUT 4m", retry_policy=FallthroughRetryPolicy()))
|
||||
await cql.run_async(f"TRUNCATE {ks}.test USING TIMEOUT 1m")
|
||||
|
||||
truncate_task = asyncio.create_task(truncate_table())
|
||||
logger.info("Adding fourth node")
|
||||
|
||||
@@ -115,7 +115,7 @@ def compact_keyspace(cql, ks, flush_memtables=True):
|
||||
args.extend([ks, cf])
|
||||
run_nodetool(cql, "compact", *args)
|
||||
|
||||
def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
|
||||
def take_snapshot(cql, table, tag, skip_flush):
|
||||
ks, cf = table.split('.')
|
||||
if has_rest_api(cql):
|
||||
requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params={'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush})
|
||||
@@ -123,8 +123,6 @@ def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
|
||||
args = ['--tag', tag, '--table', cf]
|
||||
if skip_flush:
|
||||
args.append('--skip-flush')
|
||||
if use_sstable_identifier:
|
||||
args.append('--use-sstable-identifier')
|
||||
args.append(ks)
|
||||
run_nodetool(cql, "snapshot", *args)
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@ def test_listsnapshots_no_snapshots(nodetool, request):
|
||||
assert res.stdout == "Snapshot Details: \nThere are no snapshots\n"
|
||||
|
||||
|
||||
def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=False):
|
||||
def check_snapshot_out(res, tag, ktlist, skip_flush):
|
||||
"""Check that the output of nodetool snapshot contains the expected messages"""
|
||||
|
||||
if len(ktlist) == 0:
|
||||
@@ -110,7 +110,7 @@ def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=Fals
|
||||
pattern = re.compile("Requested creating snapshot\\(s\\)"
|
||||
f" for \\[{keyspaces}\\]"
|
||||
f" with snapshot name \\[(.+)\\]"
|
||||
f" and options \\{{skip_flush={str(skip_flush).lower()}, use_sstable_identifier={str(use_sstable_identifier).lower()}\\}}")
|
||||
f" and options \\{{skipFlush={str(skip_flush).lower()}\\}}")
|
||||
|
||||
print(res)
|
||||
print(pattern)
|
||||
@@ -138,13 +138,13 @@ def test_snapshot_keyspace(nodetool):
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False)
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", "ks2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1", "ks2"], False)
|
||||
|
||||
@@ -155,13 +155,13 @@ def test_snapshot_keyspace_with_table(nodetool, option_name):
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False)
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl1,tbl2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False)
|
||||
|
||||
@@ -186,7 +186,7 @@ class kn_param(NamedTuple):
|
||||
def test_snapshot_keyspace_table_single_arg(nodetool, param, scylla_only):
|
||||
tag = "my_snapshot"
|
||||
|
||||
req_params = {"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": param.kn}
|
||||
req_params = {"tag": tag, "sf": "false", "kn": param.kn}
|
||||
if param.cf:
|
||||
req_params["cf"] = param.cf
|
||||
|
||||
@@ -202,19 +202,19 @@ def test_snapshot_ktlist(nodetool, option_name):
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1.tbl1"], False)
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1,ks2.tbl2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1.tbl1,ks2.tbl2"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1.tbl1,ks2.tbl2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1.tbl1", "ks2.tbl2"], False)
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, option_name, "ks1,ks2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1" ,"ks2"], False)
|
||||
|
||||
@@ -229,8 +229,7 @@ def test_snapshot_ktlist(nodetool, option_name):
|
||||
{"ks": ["ks1", "ks2"], "tbl": []},
|
||||
])
|
||||
@pytest.mark.parametrize("skip_flush", [False, True])
|
||||
@pytest.mark.parametrize("use_sstable_identifier", [False, True])
|
||||
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_identifier):
|
||||
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
|
||||
cmd = ["snapshot"]
|
||||
params = {}
|
||||
|
||||
@@ -243,11 +242,8 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_
|
||||
|
||||
if skip_flush:
|
||||
cmd.append("--skip-flush")
|
||||
if use_sstable_identifier:
|
||||
cmd.append("--use-sstable-identifier")
|
||||
|
||||
params["sf"] = str(skip_flush).lower()
|
||||
params["use_sstable_identifier"] = str(use_sstable_identifier).lower()
|
||||
|
||||
if ktlist:
|
||||
if "tbl" in ktlist:
|
||||
@@ -277,7 +273,7 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_
|
||||
expected_request("POST", "/storage_service/snapshots", params=params)
|
||||
])
|
||||
|
||||
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush, use_sstable_identifier)
|
||||
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush)
|
||||
|
||||
|
||||
def test_snapshot_multiple_keyspace_with_table(nodetool):
|
||||
|
||||
@@ -109,7 +109,6 @@ class ResourceGather(ABC):
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.critical(f"Process {args} timed out")
|
||||
p.kill()
|
||||
p.communicate()
|
||||
except KeyboardInterrupt:
|
||||
p.kill()
|
||||
raise
|
||||
|
||||
@@ -2362,23 +2362,16 @@ void snapshot_operation(scylla_rest_client& client, const bpo::variables_map& vm
|
||||
params["sf"] = "false";
|
||||
}
|
||||
|
||||
if (vm.contains("use-sstable-identifier")) {
|
||||
params["use_sstable_identifier"] = "true";
|
||||
} else {
|
||||
params["use_sstable_identifier"] = "false";
|
||||
}
|
||||
|
||||
client.post("/storage_service/snapshots", params);
|
||||
|
||||
if (kn_msg.empty()) {
|
||||
kn_msg = params["kn"];
|
||||
}
|
||||
|
||||
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skip_flush={}, use_sstable_identifier={}}}\n",
|
||||
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skipFlush={}}}\n",
|
||||
kn_msg,
|
||||
params["tag"],
|
||||
params["sf"],
|
||||
params["use_sstable_identifier"]);
|
||||
params["sf"]);
|
||||
fmt::print(std::cout, "Snapshot directory: {}\n", params["tag"]);
|
||||
}
|
||||
|
||||
@@ -3181,7 +3174,7 @@ void tasks_print_status(const rjson::value& res) {
|
||||
auto status = res.GetObject();
|
||||
for (const auto& x: status) {
|
||||
if (x.value.IsString()) {
|
||||
if (strcmp(x.name.GetString(), "start_time") == 0 || strcmp(x.name.GetString(), "end_time") == 0) {
|
||||
if (strcmp(x.name.GetString(), "creation_time") == 0 || strcmp(x.name.GetString(), "start_time") == 0 || strcmp(x.name.GetString(), "end_time") == 0) {
|
||||
fmt::print("{}: {}\n", x.name.GetString(), get_time(x.value.GetString()));
|
||||
} else {
|
||||
fmt::print("{}: {}\n", x.name.GetString(), x.value.GetString());
|
||||
@@ -3233,6 +3226,7 @@ void tasks_add_tree_to_statuses_lists(Tabulate& table, const rjson::value& res)
|
||||
rjson::to_string_view(status["scope"]),
|
||||
rjson::to_string_view(status["state"]),
|
||||
status["is_abortable"].GetBool(),
|
||||
get_time(rjson::to_string_view(status["creation_time"])),
|
||||
get_time(rjson::to_string_view(status["start_time"])),
|
||||
get_time(rjson::to_string_view(status["end_time"])),
|
||||
rjson::to_string_view(status["error"]),
|
||||
@@ -3252,7 +3246,7 @@ void tasks_add_tree_to_statuses_lists(Tabulate& table, const rjson::value& res)
|
||||
void tasks_print_trees(const std::vector<rjson::value>& res) {
|
||||
Tabulate table;
|
||||
table.add("id", "type", "kind", "scope", "state",
|
||||
"is_abortable", "start_time", "end_time", "error", "parent_id",
|
||||
"is_abortable", "creation_time", "start_time", "end_time", "error", "parent_id",
|
||||
"sequence_number", "shard", "keyspace", "table", "entity",
|
||||
"progress_units", "total", "completed", "children_ids");
|
||||
|
||||
@@ -3266,7 +3260,7 @@ void tasks_print_trees(const std::vector<rjson::value>& res) {
|
||||
void tasks_print_stats_list(const rjson::value& res) {
|
||||
auto stats = res.GetArray();
|
||||
Tabulate table;
|
||||
table.add("task_id", "type", "kind", "scope", "state", "sequence_number", "keyspace", "table", "entity", "shard", "start_time", "end_time");
|
||||
table.add("task_id", "type", "kind", "scope", "state", "sequence_number", "keyspace", "table", "entity", "shard", "creation_time", "start_time", "end_time");
|
||||
for (auto& element : stats) {
|
||||
const auto& s = element.GetObject();
|
||||
|
||||
@@ -3280,6 +3274,7 @@ void tasks_print_stats_list(const rjson::value& res) {
|
||||
rjson::to_string_view(s["table"]),
|
||||
rjson::to_string_view(s["entity"]),
|
||||
s["shard"].GetUint(),
|
||||
get_time(rjson::to_string_view(s["creation_time"])),
|
||||
get_time(rjson::to_string_view(s["start_time"])),
|
||||
get_time(rjson::to_string_view(s["end_time"])));
|
||||
}
|
||||
@@ -4605,7 +4600,6 @@ For more information, see: {}
|
||||
typed_option<sstring>("keyspace-table-list", "The keyspace.table pair(s) to snapshot, multiple ones can be joined with ','"),
|
||||
typed_option<sstring>("tag,t", "The name of the snapshot"),
|
||||
typed_option<>("skip-flush", "Do not flush memtables before snapshotting (snapshot will not contain unflushed data)"),
|
||||
typed_option<>("use-sstable-identifier", "Use the sstable identifier UUID, if available, rather than the sstable generation for the sstable file names within the snapshot dir and the manifest file"),
|
||||
},
|
||||
{
|
||||
typed_option<std::vector<sstring>>("keyspaces", "The keyspaces to snapshot", -1),
|
||||
|
||||
Reference in New Issue
Block a user