Merge 'Subscribe repair_info::abort on node_ops_meta_data::abort_source' from Pavel Emelyanov
The storage_service::stop() calls repair_service::abort_repair_node_ops() but at that time the sharded<repair_service> is already stopped and call .local() on it just crashes. The suggested fix is to remove explicit storage_service -> repair_service kick. Instead, the repair_infos generated for the sake of node-ops are subscribed on the node_ops_meta_data's abort source and abort themselves automatically. fixes: #10284 Closes #11797 * github.com:scylladb/scylladb: repair: Remove ops_uuid repair: Remove abort_repair_node_ops() altogether repair: Subscribe on node_ops_info::as abortion repair: Keep abort source on node_ops_info repair: Pass node_ops_info arg to do_sync_data_using_repair() repair: Mark repair_info::abort() noexcept node_ops: Remove _aborted bit node_ops: Simplify construction of node_ops_metadata main: Fix message about repair service starting
This commit is contained in:
2
main.cc
2
main.cc
@@ -1318,7 +1318,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// ATTN -- sharded repair reference already sits on storage_service and if
|
||||
// it calls repair.local() before this place it'll crash (now it doesn't do
|
||||
// both)
|
||||
supervisor::notify("starting messaging service");
|
||||
supervisor::notify("starting repair service");
|
||||
auto max_memory_repair = memory::stats().total_memory() * 0.1;
|
||||
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get();
|
||||
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
|
||||
|
||||
@@ -50,7 +50,7 @@
|
||||
logging::logger rlogger("repair");
|
||||
|
||||
void node_ops_info::check_abort() {
|
||||
if (abort) {
|
||||
if (as && as->abort_requested()) {
|
||||
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
|
||||
rlogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
@@ -444,16 +444,6 @@ void tracker::abort_all_repairs() {
|
||||
rlogger.info0("Aborted {} repair job(s), aborted={}", _aborted_pending_repairs.size(), _aborted_pending_repairs);
|
||||
}
|
||||
|
||||
void tracker::abort_repair_node_ops(utils::UUID ops_uuid) {
|
||||
for (auto& x : _repairs) {
|
||||
auto& ri = x.second;
|
||||
if (ri->ops_uuid() && ri->ops_uuid().value() == ops_uuid) {
|
||||
rlogger.info0("Aborted repair jobs for ops_uuid={}", ops_uuid);
|
||||
ri->abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
float tracker::report_progress(streaming::stream_reason reason) {
|
||||
uint64_t nr_ranges_finished = 0;
|
||||
uint64_t nr_ranges_total = 0;
|
||||
@@ -542,7 +532,7 @@ repair_info::repair_info(repair_service& repair,
|
||||
const std::vector<sstring>& hosts_,
|
||||
const std::unordered_set<gms::inet_address>& ignore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
std::optional<utils::UUID> ops_uuid,
|
||||
shared_ptr<abort_source> as,
|
||||
bool hints_batchlog_flushed)
|
||||
: rs(repair)
|
||||
, db(repair.get_db())
|
||||
@@ -564,8 +554,10 @@ repair_info::repair_info(repair_service& repair,
|
||||
, reason(reason_)
|
||||
, total_rf(db.local().find_keyspace(keyspace).get_effective_replication_map()->get_replication_factor())
|
||||
, nr_ranges_total(ranges.size())
|
||||
, _ops_uuid(std::move(ops_uuid))
|
||||
, _hints_batchlog_flushed(std::move(hints_batchlog_flushed)) {
|
||||
if (as != nullptr) {
|
||||
_abort_subscription = as->subscribe([this] () noexcept { abort(); });
|
||||
}
|
||||
}
|
||||
|
||||
void repair_info::check_failed_ranges() {
|
||||
@@ -583,7 +575,7 @@ void repair_info::check_failed_ranges() {
|
||||
}
|
||||
}
|
||||
|
||||
void repair_info::abort() {
|
||||
void repair_info::abort() noexcept {
|
||||
aborted = true;
|
||||
}
|
||||
|
||||
@@ -1199,7 +1191,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
|
||||
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
|
||||
auto ri = make_lw_shared<repair_info>(local_repair,
|
||||
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, id.uuid, hints_batchlog_flushed);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, nullptr, hints_batchlog_flushed);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
repair_results.push_back(std::move(f));
|
||||
@@ -1266,12 +1258,12 @@ future<> repair_service::sync_data_using_repair(
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||
streaming::stream_reason reason,
|
||||
std::optional<utils::UUID> ops_uuid) {
|
||||
shared_ptr<node_ops_info> ops_info) {
|
||||
if (ranges.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return container().invoke_on(0, [keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] (repair_service& local_repair) mutable {
|
||||
return local_repair.do_sync_data_using_repair(std::move(keyspace), std::move(ranges), std::move(neighbors), reason, ops_uuid);
|
||||
return container().invoke_on(0, [keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] (repair_service& local_repair) mutable {
|
||||
return local_repair.do_sync_data_using_repair(std::move(keyspace), std::move(ranges), std::move(neighbors), reason, ops_info);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1280,12 +1272,12 @@ future<> repair_service::do_sync_data_using_repair(
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||
streaming::stream_reason reason,
|
||||
std::optional<utils::UUID> ops_uuid) {
|
||||
shared_ptr<node_ops_info> ops_info) {
|
||||
seastar::sharded<replica::database>& db = get_db();
|
||||
|
||||
repair_uniq_id id = repair_tracker().next_repair_command();
|
||||
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid, keyspace);
|
||||
return repair_tracker().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable {
|
||||
return repair_tracker().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
|
||||
auto cfs = list_column_families(db.local(), keyspace);
|
||||
if (cfs.empty()) {
|
||||
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid, keyspace);
|
||||
@@ -1298,14 +1290,14 @@ future<> repair_service::do_sync_data_using_repair(
|
||||
throw std::runtime_error("aborted by user request");
|
||||
}
|
||||
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
||||
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_uuid] (repair_service& local_repair) mutable {
|
||||
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info] (repair_service& local_repair) mutable {
|
||||
auto data_centers = std::vector<sstring>();
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ignore_nodes = std::unordered_set<gms::inet_address>();
|
||||
bool hints_batchlog_flushed = false;
|
||||
auto ri = make_lw_shared<repair_info>(local_repair,
|
||||
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_uuid, hints_batchlog_flushed);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_info ? ops_info->as : nullptr, hints_batchlog_flushed);
|
||||
ri->neighbors = std::move(neighbors);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
@@ -1504,7 +1496,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
}
|
||||
}
|
||||
auto nr_ranges = desired_ranges.size();
|
||||
sync_data_using_repair(keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, {}).get();
|
||||
sync_data_using_repair(keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get();
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
|
||||
}
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", ks_erms | boost::adaptors::map_keys);
|
||||
@@ -1695,8 +1687,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
ranges.swap(ranges_for_removenode);
|
||||
}
|
||||
auto nr_ranges_synced = ranges.size();
|
||||
std::optional<utils::UUID> opt_uuid = ops ? std::make_optional<utils::UUID>(ops->ops_uuid) : std::nullopt;
|
||||
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, opt_uuid).get();
|
||||
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, ops).get();
|
||||
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
|
||||
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
|
||||
}
|
||||
@@ -1720,12 +1711,6 @@ future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmpt
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::abort_repair_node_ops(utils::UUID ops_uuid) {
|
||||
return container().invoke_on_all([ops_uuid] (repair_service& rs) {
|
||||
rs.repair_tracker().abort_repair_node_ops(ops_uuid);
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::list<gms::inet_address> ignore_nodes) {
|
||||
return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable {
|
||||
seastar::sharded<replica::database>& db = get_db();
|
||||
@@ -1802,7 +1787,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
}).get();
|
||||
}
|
||||
auto nr_ranges = ranges.size();
|
||||
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, {}).get();
|
||||
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, nullptr).get();
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, ks_erms | boost::adaptors::map_keys, source_dc);
|
||||
|
||||
@@ -69,7 +69,7 @@ std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
|
||||
|
||||
struct node_ops_info {
|
||||
utils::UUID ops_uuid;
|
||||
bool abort = false;
|
||||
shared_ptr<abort_source> as;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
void check_abort();
|
||||
};
|
||||
@@ -167,7 +167,7 @@ public:
|
||||
int ranges_index = 0;
|
||||
repair_stats _stats;
|
||||
std::unordered_set<sstring> dropped_tables;
|
||||
std::optional<utils::UUID> _ops_uuid;
|
||||
optimized_optional<abort_source::subscription> _abort_subscription;
|
||||
bool _hints_batchlog_flushed = false;
|
||||
public:
|
||||
repair_info(repair_service& repair,
|
||||
@@ -179,10 +179,10 @@ public:
|
||||
const std::vector<sstring>& hosts_,
|
||||
const std::unordered_set<gms::inet_address>& ingore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
std::optional<utils::UUID> ops_uuid,
|
||||
shared_ptr<abort_source> as,
|
||||
bool hints_batchlog_flushed);
|
||||
void check_failed_ranges();
|
||||
void abort();
|
||||
void abort() noexcept;
|
||||
void check_in_abort();
|
||||
void check_in_shutdown();
|
||||
repair_neighbors get_repair_neighbors(const dht::token_range& range);
|
||||
@@ -192,9 +192,6 @@ public:
|
||||
const std::vector<sstring>& table_names() {
|
||||
return cfs;
|
||||
}
|
||||
const std::optional<utils::UUID>& ops_uuid() const {
|
||||
return _ops_uuid;
|
||||
};
|
||||
|
||||
bool hints_batchlog_flushed() const {
|
||||
return _hints_batchlog_flushed;
|
||||
@@ -254,7 +251,6 @@ public:
|
||||
future<> run(repair_uniq_id id, std::function<void ()> func);
|
||||
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
|
||||
float report_progress(streaming::stream_reason reason);
|
||||
void abort_repair_node_ops(utils::UUID ops_uuid);
|
||||
bool is_aborted(const utils::UUID& uuid);
|
||||
};
|
||||
|
||||
|
||||
@@ -146,13 +146,13 @@ private:
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||
streaming::stream_reason reason,
|
||||
std::optional<utils::UUID> ops_uuid);
|
||||
shared_ptr<node_ops_info> ops_info);
|
||||
|
||||
future<> do_sync_data_using_repair(sstring keyspace,
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||
streaming::stream_reason reason,
|
||||
std::optional<utils::UUID> ops_uuid);
|
||||
shared_ptr<node_ops_info> ops_info);
|
||||
|
||||
future<repair_update_system_table_response> repair_update_system_table_handler(
|
||||
gms::inet_address from,
|
||||
@@ -198,8 +198,6 @@ public:
|
||||
// Abort all the repairs
|
||||
future<> abort_all();
|
||||
|
||||
future<> abort_repair_node_ops(utils::UUID ops_uuid);
|
||||
|
||||
std::unordered_map<node_repair_meta_id, repair_meta_ptr>& repair_meta_map() noexcept {
|
||||
return _repair_metas;
|
||||
}
|
||||
|
||||
@@ -2524,8 +2524,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
@@ -2573,8 +2572,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
@@ -2616,8 +2614,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
@@ -2671,8 +2668,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.bootstrap_nodes) {
|
||||
auto& endpoint = x.first;
|
||||
@@ -2929,7 +2925,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
|
||||
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
|
||||
auto ops_uuid = utils::make_random_uuid();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::list<gms::inet_address>()});
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, nullptr, std::list<gms::inet_address>()});
|
||||
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
|
||||
return send_replication_notification(notify_endpoint);
|
||||
});
|
||||
@@ -3654,7 +3650,7 @@ bool storage_service::is_repair_based_node_ops_enabled(streaming::stream_reason
|
||||
node_ops_meta_data::node_ops_meta_data(
|
||||
utils::UUID ops_uuid,
|
||||
gms::inet_address coordinator,
|
||||
shared_ptr<node_ops_info> ops,
|
||||
std::list<gms::inet_address> ignore_nodes,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func)
|
||||
: _ops_uuid(std::move(ops_uuid))
|
||||
@@ -3662,24 +3658,20 @@ node_ops_meta_data::node_ops_meta_data(
|
||||
, _abort(std::move(abort_func))
|
||||
, _abort_source(seastar::make_shared<abort_source>())
|
||||
, _signal(std::move(signal_func))
|
||||
, _ops(std::move(ops))
|
||||
, _ops(seastar::make_shared<node_ops_info>({_ops_uuid, _abort_source, std::move(ignore_nodes)}))
|
||||
, _watchdog([sig = _signal] { sig(); }) {
|
||||
_watchdog.arm(_watchdog_interval);
|
||||
}
|
||||
|
||||
future<> node_ops_meta_data::abort() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
|
||||
_aborted = true;
|
||||
if (_ops) {
|
||||
_ops->abort = true;
|
||||
}
|
||||
_watchdog.cancel();
|
||||
return _abort();
|
||||
}
|
||||
|
||||
void node_ops_meta_data::update_watchdog() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} update_watchdog", _ops_uuid);
|
||||
if (_aborted) {
|
||||
if (_abort_source->abort_requested()) {
|
||||
return;
|
||||
}
|
||||
_watchdog.cancel();
|
||||
@@ -3731,7 +3723,6 @@ future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
if (as && !as->abort_requested()) {
|
||||
as->request_abort();
|
||||
}
|
||||
co_await _repair.local().abort_repair_node_ops(ops_uuid);
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,12 +98,11 @@ class node_ops_meta_data {
|
||||
shared_ptr<node_ops_info> _ops;
|
||||
seastar::timer<lowres_clock> _watchdog;
|
||||
std::chrono::seconds _watchdog_interval{120};
|
||||
bool _aborted = false;
|
||||
public:
|
||||
explicit node_ops_meta_data(
|
||||
utils::UUID ops_uuid,
|
||||
gms::inet_address coordinator,
|
||||
shared_ptr<node_ops_info> ops,
|
||||
std::list<gms::inet_address> ignore_nodes,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func);
|
||||
shared_ptr<node_ops_info> get_ops_info();
|
||||
|
||||
Reference in New Issue
Block a user