Currently, _flush_time was stored as a std::optional<gc_clock::time_point> and std::nullopt indicates that the flush was needed but failed. It's confusing for the caller and does not work as expected since the _flush_time is initialized with value (not optional). Change _flush_time type to gc_clock::time_point. If a flush is needed but failed, get_flush_time() throws an exception. This was suppose to be a part of https://github.com/scylladb/scylladb/pull/26319 but it was mistakenly overwritten during rebases. Refs: https://github.com/scylladb/scylladb/issues/24415. Closes scylladb/scylladb#26794
304 lines
12 KiB
C++
304 lines
12 KiB
C++
/*
|
|
* Copyright (C) 2022-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "node_ops/node_ops_ctl.hh"
|
|
#include "repair/repair.hh"
|
|
#include "service/topology_guard.hh"
|
|
#include "streaming/stream_reason.hh"
|
|
#include "tasks/task_manager.hh"
|
|
|
|
namespace repair {
|
|
|
|
class repair_task_impl : public tasks::task_manager::task::impl {
|
|
protected:
|
|
streaming::stream_reason _reason;
|
|
public:
|
|
repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id, streaming::stream_reason reason) noexcept
|
|
: tasks::task_manager::task::impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id)
|
|
, _reason(reason) {
|
|
_status.progress_units = "ranges";
|
|
}
|
|
|
|
virtual std::string type() const override {
|
|
return format("{}", _reason);
|
|
}
|
|
protected:
|
|
repair_uniq_id get_repair_uniq_id() const noexcept {
|
|
return repair_uniq_id{
|
|
.id = _status.sequence_number,
|
|
.task_info = tasks::task_info(_status.id, _status.shard)
|
|
};
|
|
}
|
|
|
|
virtual future<> run() override = 0;
|
|
};
|
|
|
|
class user_requested_repair_task_impl : public repair_task_impl {
|
|
private:
|
|
lw_shared_ptr<locator::global_static_effective_replication_map> _germs;
|
|
std::vector<sstring> _cfs;
|
|
dht::token_range_vector _ranges;
|
|
std::vector<sstring> _hosts;
|
|
std::vector<sstring> _data_centers;
|
|
std::unordered_set<locator::host_id> _ignore_nodes;
|
|
bool _small_table_optimization;
|
|
std::optional<int> _ranges_parallelism;
|
|
gms::gossiper& _gossiper;
|
|
public:
|
|
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_static_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<locator::host_id> ignore_nodes, bool small_table_optimization, std::optional<int> ranges_parallelism, gms::gossiper& gossiper) noexcept
|
|
: repair_task_impl(module, id.uuid(), id.id, "keyspace", std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair)
|
|
, _germs(germs)
|
|
, _cfs(std::move(cfs))
|
|
, _ranges(std::move(ranges))
|
|
, _hosts(std::move(hosts))
|
|
, _data_centers(std::move(data_centers))
|
|
, _ignore_nodes(std::move(ignore_nodes))
|
|
, _small_table_optimization(small_table_optimization)
|
|
, _ranges_parallelism(ranges_parallelism)
|
|
, _gossiper(gossiper)
|
|
{}
|
|
|
|
virtual tasks::is_abortable is_abortable() const noexcept override {
|
|
return tasks::is_abortable::yes;
|
|
}
|
|
|
|
tasks::is_user_task is_user_task() const noexcept override;
|
|
protected:
|
|
future<> run() override;
|
|
|
|
virtual future<std::optional<double>> expected_total_workload() const override;
|
|
};
|
|
|
|
class data_sync_repair_task_impl : public repair_task_impl {
|
|
private:
|
|
dht::token_range_vector _ranges;
|
|
std::unordered_map<dht::token_range, repair_neighbors> _neighbors;
|
|
optimized_optional<abort_source::subscription> _abort_subscription;
|
|
size_t _cfs_size = 0;
|
|
public:
|
|
data_sync_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, dht::token_range_vector ranges, std::unordered_map<dht::token_range, repair_neighbors> neighbors, streaming::stream_reason reason, shared_ptr<node_ops_info> ops_info)
|
|
: repair_task_impl(module, id.uuid(), id.id, "keyspace", std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), reason)
|
|
, _ranges(std::move(ranges))
|
|
, _neighbors(std::move(neighbors))
|
|
{
|
|
if (ops_info && ops_info->as) {
|
|
_abort_subscription = ops_info->as->subscribe([this] () noexcept {
|
|
abort();
|
|
});
|
|
}
|
|
}
|
|
|
|
virtual tasks::is_abortable is_abortable() const noexcept override {
|
|
return tasks::is_abortable(!_abort_subscription);
|
|
}
|
|
protected:
|
|
future<> run() override;
|
|
|
|
virtual future<std::optional<double>> expected_total_workload() const override;
|
|
};
|
|
|
|
class tablet_repair_task_impl : public repair_task_impl {
|
|
private:
|
|
sstring _keyspace;
|
|
std::vector<sstring> _tables;
|
|
std::vector<tablet_repair_task_meta> _metas;
|
|
optimized_optional<abort_source::subscription> _abort_subscription;
|
|
std::optional<int> _ranges_parallelism;
|
|
size_t _metas_size = 0;
|
|
gc_clock::time_point _flush_time = gc_clock::time_point();
|
|
bool _should_flush_and_flush_failed = false;
|
|
service::frozen_topology_guard _topo_guard;
|
|
bool _skip_flush;
|
|
public:
|
|
tablet_repair_sched_info sched_info;
|
|
public:
|
|
tablet_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, sstring keyspace, tasks::task_id parent_id, std::vector<sstring> tables, streaming::stream_reason reason, std::vector<tablet_repair_task_meta> metas, std::optional<int> ranges_parallelism, service::frozen_topology_guard topo_guard, tablet_repair_sched_info sched_info, bool skip_flush = false)
|
|
: repair_task_impl(module, id.uuid(), id.id, "keyspace", keyspace, "", "", parent_id, reason)
|
|
, _keyspace(std::move(keyspace))
|
|
, _tables(std::move(tables))
|
|
, _metas(std::move(metas))
|
|
, _ranges_parallelism(ranges_parallelism)
|
|
, _topo_guard(topo_guard)
|
|
, _skip_flush(skip_flush)
|
|
, sched_info(std::move(sched_info))
|
|
{
|
|
}
|
|
|
|
virtual tasks::is_abortable is_abortable() const noexcept override {
|
|
return tasks::is_abortable(!_abort_subscription);
|
|
}
|
|
|
|
gc_clock::time_point get_flush_time() const {
|
|
if (_should_flush_and_flush_failed) {
|
|
throw std::runtime_error(fmt::format("Flush is needed for repair {} with parent {}, but failed", id(), _parent_id));
|
|
}
|
|
return _flush_time;
|
|
}
|
|
|
|
tasks::is_user_task is_user_task() const noexcept override;
|
|
virtual future<> release_resources() noexcept override;
|
|
private:
|
|
size_t get_metas_size() const noexcept;
|
|
protected:
|
|
future<> run() override;
|
|
|
|
virtual future<std::optional<double>> expected_total_workload() const override;
|
|
};
|
|
|
|
class shard_repair_task_impl : public repair_task_impl {
|
|
public:
|
|
repair_service& rs;
|
|
seastar::sharded<replica::database>& db;
|
|
seastar::sharded<netw::messaging_service>& messaging;
|
|
service::migration_manager& mm;
|
|
gms::gossiper& gossiper;
|
|
private:
|
|
locator::effective_replication_map_ptr erm;
|
|
public:
|
|
dht::token_range_vector ranges;
|
|
std::vector<sstring> cfs;
|
|
std::vector<table_id> table_ids;
|
|
repair_uniq_id global_repair_id;
|
|
std::vector<sstring> data_centers;
|
|
std::vector<sstring> hosts;
|
|
std::unordered_set<locator::host_id> ignore_nodes;
|
|
std::unordered_map<dht::token_range, repair_neighbors> neighbors;
|
|
uint64_t nr_ranges_finished = 0;
|
|
size_t nr_failed_ranges = 0;
|
|
int ranges_index = 0;
|
|
repair_stats _stats;
|
|
std::unordered_set<sstring> dropped_tables;
|
|
bool _hints_batchlog_flushed = false;
|
|
std::unordered_set<locator::host_id> nodes_down;
|
|
bool _small_table_optimization = false;
|
|
size_t small_table_optimization_ranges_reduced_factor = 1;
|
|
private:
|
|
bool _aborted = false;
|
|
std::optional<sstring> _failed_because;
|
|
std::optional<semaphore> _user_ranges_parallelism;
|
|
uint64_t _ranges_complete = 0;
|
|
gc_clock::time_point _flush_time;
|
|
service::frozen_topology_guard _frozen_topology_guard;
|
|
service::topology_guard _topology_guard = {service::null_topology_guard};
|
|
public:
|
|
tablet_repair_sched_info sched_info;
|
|
public:
|
|
shard_repair_task_impl(tasks::task_manager::module_ptr module,
|
|
tasks::task_id id,
|
|
sstring keyspace,
|
|
repair_service& repair,
|
|
locator::effective_replication_map_ptr erm_,
|
|
dht::token_range_vector ranges_,
|
|
std::vector<table_id> table_ids_,
|
|
repair_uniq_id parent_id_,
|
|
std::vector<sstring> data_centers_,
|
|
std::vector<sstring> hosts_,
|
|
std::unordered_set<locator::host_id> ignore_nodes_,
|
|
std::unordered_map<dht::token_range, repair_neighbors> neighbors_,
|
|
streaming::stream_reason reason_,
|
|
bool hints_batchlog_flushed,
|
|
bool small_table_optimization,
|
|
std::optional<int> ranges_parallelism,
|
|
gc_clock::time_point flush_time,
|
|
service::frozen_topology_guard topo_guard,
|
|
tablet_repair_sched_info sched_info = tablet_repair_sched_info(),
|
|
size_t small_table_optimization_ranges_reduced_factor_ = 1);
|
|
void check_failed_ranges();
|
|
void check_in_abort_or_shutdown();
|
|
repair_neighbors get_repair_neighbors(const dht::token_range& range);
|
|
gc_clock::time_point get_flush_time() const { return _flush_time; }
|
|
void update_statistics(const repair_stats& stats) {
|
|
_stats.add(stats);
|
|
}
|
|
const std::vector<sstring>& table_names() {
|
|
return cfs;
|
|
}
|
|
const std::string& get_keyspace() const noexcept {
|
|
return _status.keyspace;
|
|
}
|
|
streaming::stream_reason reason() const noexcept {
|
|
return _reason;
|
|
}
|
|
|
|
bool hints_batchlog_flushed() const {
|
|
return _hints_batchlog_flushed;
|
|
}
|
|
|
|
locator::effective_replication_map_ptr get_erm();
|
|
|
|
size_t get_total_rf() {
|
|
return get_erm()->get_replication_factor();
|
|
}
|
|
|
|
future<> repair_range(const dht::token_range& range, table_info table);
|
|
|
|
size_t ranges_size() const noexcept;
|
|
|
|
virtual future<> release_resources() noexcept override;
|
|
protected:
|
|
future<> do_repair_ranges();
|
|
virtual future<tasks::task_manager::task::progress> get_progress() const override;
|
|
future<> run() override;
|
|
};
|
|
|
|
// The repair::task_manager_module tracks ongoing repair operations and their progress.
|
|
// A repair which has already finished successfully is dropped from this
|
|
// table, but a failed repair will remain in the table forever so it can
|
|
// be queried about more than once (FIXME: reconsider this. But note that
|
|
// failed repairs should be rare anwyay).
|
|
class task_manager_module : public tasks::task_manager::module {
|
|
private:
|
|
repair_service& _rs;
|
|
// Note that there are no "SUCCESSFUL" entries in the "status" map:
|
|
// Successfully-finished repairs are those with id <= repair_module::_sequence_number
|
|
// but aren't listed as running or failed the status map.
|
|
std::unordered_map<int, repair_status> _status;
|
|
// Map repair id into repair_info.
|
|
std::unordered_map<int, tasks::task_id> _repairs;
|
|
std::unordered_set<tasks::task_id> _pending_repairs;
|
|
// The semaphore used to control the maximum
|
|
// ranges that can be repaired in parallel.
|
|
named_semaphore _range_parallelism_semaphore;
|
|
seastar::condition_variable _done_cond;
|
|
void start(repair_uniq_id id);
|
|
void done(repair_uniq_id id, bool succeeded);
|
|
public:
|
|
static constexpr size_t max_repair_memory_per_range = 32 * 1024 * 1024;
|
|
|
|
task_manager_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept;
|
|
|
|
repair_service& get_repair_service() noexcept {
|
|
return _rs;
|
|
}
|
|
|
|
repair_uniq_id new_repair_uniq_id() noexcept {
|
|
return repair_uniq_id{
|
|
.id = new_sequence_number(),
|
|
.task_info = tasks::task_info(tasks::task_id::create_random_id(), this_shard_id())
|
|
};
|
|
}
|
|
|
|
repair_status get(int id) const;
|
|
void check_in_shutdown();
|
|
void add_shard_task_id(int id, tasks::task_id ri);
|
|
void remove_shard_task_id(int id);
|
|
tasks::task_manager::task_ptr get_shard_task_ptr(int id);
|
|
std::vector<int> get_active() const;
|
|
size_t nr_running_repair_jobs();
|
|
void abort_all_repairs();
|
|
named_semaphore& range_parallelism_semaphore();
|
|
future<> run(repair_uniq_id id, std::function<void ()> func);
|
|
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
|
|
float report_progress();
|
|
future<bool> is_aborted(const tasks::task_id& uuid, shard_id shard);
|
|
};
|
|
|
|
}
|