Files
scylladb/repair/task_manager_module.hh
Aleksandra Martyniuk e3e81a9a7a repair: throw if flush failed in get_flush_time
Currently, _flush_time was stored as a std::optional<gc_clock::time_point>
and std::nullopt indicates that the flush was needed but failed. It's confusing
for the caller and does not work as expected since the _flush_time is initialized
with value (not optional).

Change _flush_time type to gc_clock::time_point. If a flush is needed but failed,
get_flush_time() throws an exception.

This was suppose to be a part of https://github.com/scylladb/scylladb/pull/26319
but it was mistakenly overwritten during rebases.

Refs: https://github.com/scylladb/scylladb/issues/24415.

Closes scylladb/scylladb#26794
2025-12-04 11:45:53 +02:00

304 lines
12 KiB
C++

/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "node_ops/node_ops_ctl.hh"
#include "repair/repair.hh"
#include "service/topology_guard.hh"
#include "streaming/stream_reason.hh"
#include "tasks/task_manager.hh"
namespace repair {
class repair_task_impl : public tasks::task_manager::task::impl {
protected:
streaming::stream_reason _reason;
public:
repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id, streaming::stream_reason reason) noexcept
: tasks::task_manager::task::impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id)
, _reason(reason) {
_status.progress_units = "ranges";
}
virtual std::string type() const override {
return format("{}", _reason);
}
protected:
repair_uniq_id get_repair_uniq_id() const noexcept {
return repair_uniq_id{
.id = _status.sequence_number,
.task_info = tasks::task_info(_status.id, _status.shard)
};
}
virtual future<> run() override = 0;
};
class user_requested_repair_task_impl : public repair_task_impl {
private:
lw_shared_ptr<locator::global_static_effective_replication_map> _germs;
std::vector<sstring> _cfs;
dht::token_range_vector _ranges;
std::vector<sstring> _hosts;
std::vector<sstring> _data_centers;
std::unordered_set<locator::host_id> _ignore_nodes;
bool _small_table_optimization;
std::optional<int> _ranges_parallelism;
gms::gossiper& _gossiper;
public:
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_static_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<locator::host_id> ignore_nodes, bool small_table_optimization, std::optional<int> ranges_parallelism, gms::gossiper& gossiper) noexcept
: repair_task_impl(module, id.uuid(), id.id, "keyspace", std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair)
, _germs(germs)
, _cfs(std::move(cfs))
, _ranges(std::move(ranges))
, _hosts(std::move(hosts))
, _data_centers(std::move(data_centers))
, _ignore_nodes(std::move(ignore_nodes))
, _small_table_optimization(small_table_optimization)
, _ranges_parallelism(ranges_parallelism)
, _gossiper(gossiper)
{}
virtual tasks::is_abortable is_abortable() const noexcept override {
return tasks::is_abortable::yes;
}
tasks::is_user_task is_user_task() const noexcept override;
protected:
future<> run() override;
virtual future<std::optional<double>> expected_total_workload() const override;
};
class data_sync_repair_task_impl : public repair_task_impl {
private:
dht::token_range_vector _ranges;
std::unordered_map<dht::token_range, repair_neighbors> _neighbors;
optimized_optional<abort_source::subscription> _abort_subscription;
size_t _cfs_size = 0;
public:
data_sync_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, dht::token_range_vector ranges, std::unordered_map<dht::token_range, repair_neighbors> neighbors, streaming::stream_reason reason, shared_ptr<node_ops_info> ops_info)
: repair_task_impl(module, id.uuid(), id.id, "keyspace", std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), reason)
, _ranges(std::move(ranges))
, _neighbors(std::move(neighbors))
{
if (ops_info && ops_info->as) {
_abort_subscription = ops_info->as->subscribe([this] () noexcept {
abort();
});
}
}
virtual tasks::is_abortable is_abortable() const noexcept override {
return tasks::is_abortable(!_abort_subscription);
}
protected:
future<> run() override;
virtual future<std::optional<double>> expected_total_workload() const override;
};
class tablet_repair_task_impl : public repair_task_impl {
private:
sstring _keyspace;
std::vector<sstring> _tables;
std::vector<tablet_repair_task_meta> _metas;
optimized_optional<abort_source::subscription> _abort_subscription;
std::optional<int> _ranges_parallelism;
size_t _metas_size = 0;
gc_clock::time_point _flush_time = gc_clock::time_point();
bool _should_flush_and_flush_failed = false;
service::frozen_topology_guard _topo_guard;
bool _skip_flush;
public:
tablet_repair_sched_info sched_info;
public:
tablet_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, sstring keyspace, tasks::task_id parent_id, std::vector<sstring> tables, streaming::stream_reason reason, std::vector<tablet_repair_task_meta> metas, std::optional<int> ranges_parallelism, service::frozen_topology_guard topo_guard, tablet_repair_sched_info sched_info, bool skip_flush = false)
: repair_task_impl(module, id.uuid(), id.id, "keyspace", keyspace, "", "", parent_id, reason)
, _keyspace(std::move(keyspace))
, _tables(std::move(tables))
, _metas(std::move(metas))
, _ranges_parallelism(ranges_parallelism)
, _topo_guard(topo_guard)
, _skip_flush(skip_flush)
, sched_info(std::move(sched_info))
{
}
virtual tasks::is_abortable is_abortable() const noexcept override {
return tasks::is_abortable(!_abort_subscription);
}
gc_clock::time_point get_flush_time() const {
if (_should_flush_and_flush_failed) {
throw std::runtime_error(fmt::format("Flush is needed for repair {} with parent {}, but failed", id(), _parent_id));
}
return _flush_time;
}
tasks::is_user_task is_user_task() const noexcept override;
virtual future<> release_resources() noexcept override;
private:
size_t get_metas_size() const noexcept;
protected:
future<> run() override;
virtual future<std::optional<double>> expected_total_workload() const override;
};
class shard_repair_task_impl : public repair_task_impl {
public:
repair_service& rs;
seastar::sharded<replica::database>& db;
seastar::sharded<netw::messaging_service>& messaging;
service::migration_manager& mm;
gms::gossiper& gossiper;
private:
locator::effective_replication_map_ptr erm;
public:
dht::token_range_vector ranges;
std::vector<sstring> cfs;
std::vector<table_id> table_ids;
repair_uniq_id global_repair_id;
std::vector<sstring> data_centers;
std::vector<sstring> hosts;
std::unordered_set<locator::host_id> ignore_nodes;
std::unordered_map<dht::token_range, repair_neighbors> neighbors;
uint64_t nr_ranges_finished = 0;
size_t nr_failed_ranges = 0;
int ranges_index = 0;
repair_stats _stats;
std::unordered_set<sstring> dropped_tables;
bool _hints_batchlog_flushed = false;
std::unordered_set<locator::host_id> nodes_down;
bool _small_table_optimization = false;
size_t small_table_optimization_ranges_reduced_factor = 1;
private:
bool _aborted = false;
std::optional<sstring> _failed_because;
std::optional<semaphore> _user_ranges_parallelism;
uint64_t _ranges_complete = 0;
gc_clock::time_point _flush_time;
service::frozen_topology_guard _frozen_topology_guard;
service::topology_guard _topology_guard = {service::null_topology_guard};
public:
tablet_repair_sched_info sched_info;
public:
shard_repair_task_impl(tasks::task_manager::module_ptr module,
tasks::task_id id,
sstring keyspace,
repair_service& repair,
locator::effective_replication_map_ptr erm_,
dht::token_range_vector ranges_,
std::vector<table_id> table_ids_,
repair_uniq_id parent_id_,
std::vector<sstring> data_centers_,
std::vector<sstring> hosts_,
std::unordered_set<locator::host_id> ignore_nodes_,
std::unordered_map<dht::token_range, repair_neighbors> neighbors_,
streaming::stream_reason reason_,
bool hints_batchlog_flushed,
bool small_table_optimization,
std::optional<int> ranges_parallelism,
gc_clock::time_point flush_time,
service::frozen_topology_guard topo_guard,
tablet_repair_sched_info sched_info = tablet_repair_sched_info(),
size_t small_table_optimization_ranges_reduced_factor_ = 1);
void check_failed_ranges();
void check_in_abort_or_shutdown();
repair_neighbors get_repair_neighbors(const dht::token_range& range);
gc_clock::time_point get_flush_time() const { return _flush_time; }
void update_statistics(const repair_stats& stats) {
_stats.add(stats);
}
const std::vector<sstring>& table_names() {
return cfs;
}
const std::string& get_keyspace() const noexcept {
return _status.keyspace;
}
streaming::stream_reason reason() const noexcept {
return _reason;
}
bool hints_batchlog_flushed() const {
return _hints_batchlog_flushed;
}
locator::effective_replication_map_ptr get_erm();
size_t get_total_rf() {
return get_erm()->get_replication_factor();
}
future<> repair_range(const dht::token_range& range, table_info table);
size_t ranges_size() const noexcept;
virtual future<> release_resources() noexcept override;
protected:
future<> do_repair_ranges();
virtual future<tasks::task_manager::task::progress> get_progress() const override;
future<> run() override;
};
// The repair::task_manager_module tracks ongoing repair operations and their progress.
// A repair which has already finished successfully is dropped from this
// table, but a failed repair will remain in the table forever so it can
// be queried about more than once (FIXME: reconsider this. But note that
// failed repairs should be rare anwyay).
class task_manager_module : public tasks::task_manager::module {
private:
repair_service& _rs;
// Note that there are no "SUCCESSFUL" entries in the "status" map:
// Successfully-finished repairs are those with id <= repair_module::_sequence_number
// but aren't listed as running or failed the status map.
std::unordered_map<int, repair_status> _status;
// Map repair id into repair_info.
std::unordered_map<int, tasks::task_id> _repairs;
std::unordered_set<tasks::task_id> _pending_repairs;
// The semaphore used to control the maximum
// ranges that can be repaired in parallel.
named_semaphore _range_parallelism_semaphore;
seastar::condition_variable _done_cond;
void start(repair_uniq_id id);
void done(repair_uniq_id id, bool succeeded);
public:
static constexpr size_t max_repair_memory_per_range = 32 * 1024 * 1024;
task_manager_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept;
repair_service& get_repair_service() noexcept {
return _rs;
}
repair_uniq_id new_repair_uniq_id() noexcept {
return repair_uniq_id{
.id = new_sequence_number(),
.task_info = tasks::task_info(tasks::task_id::create_random_id(), this_shard_id())
};
}
repair_status get(int id) const;
void check_in_shutdown();
void add_shard_task_id(int id, tasks::task_id ri);
void remove_shard_task_id(int id);
tasks::task_manager::task_ptr get_shard_task_ptr(int id);
std::vector<int> get_active() const;
size_t nr_running_repair_jobs();
void abort_all_repairs();
named_semaphore& range_parallelism_semaphore();
future<> run(repair_uniq_id id, std::function<void ()> func);
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
float report_progress();
future<bool> is_aborted(const tasks::task_id& uuid, shard_id shard);
};
}