Merge 'Subscribe repair_info::abort on node_ops_meta_data::abort_source' from Pavel Emelyanov

The storage_service::stop() calls repair_service::abort_repair_node_ops() but at that time the sharded<repair_service> is already stopped and call .local() on it just crashes.

The suggested fix is to remove explicit storage_service -> repair_service kick. Instead, the repair_infos generated for the sake of node-ops are subscribed on the node_ops_meta_data's abort source and abort themselves automatically.

fixes: #10284

Closes #11797

* github.com:scylladb/scylladb:
  repair: Remove ops_uuid
  repair: Remove abort_repair_node_ops() altogether
  repair: Subscribe on node_ops_info::as abortion
  repair: Keep abort source on node_ops_info
  repair: Pass node_ops_info arg to do_sync_data_using_repair()
  repair: Mark repair_info::abort() noexcept
  node_ops: Remove _aborted bit
  node_ops: Simplify construction of node_ops_metadata
  main: Fix message about repair service starting
This commit is contained in:
Botond Dénes
2022-10-21 10:08:43 +03:00
6 changed files with 33 additions and 64 deletions

View File

@@ -1318,7 +1318,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// ATTN -- sharded repair reference already sits on storage_service and if
// it calls repair.local() before this place it'll crash (now it doesn't do
// both)
supervisor::notify("starting messaging service");
supervisor::notify("starting repair service");
auto max_memory_repair = memory::stats().total_memory() * 0.1;
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get();
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {

View File

@@ -50,7 +50,7 @@
logging::logger rlogger("repair");
void node_ops_info::check_abort() {
if (abort) {
if (as && as->abort_requested()) {
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
rlogger.warn("{}", msg);
throw std::runtime_error(msg);
@@ -444,16 +444,6 @@ void tracker::abort_all_repairs() {
rlogger.info0("Aborted {} repair job(s), aborted={}", _aborted_pending_repairs.size(), _aborted_pending_repairs);
}
void tracker::abort_repair_node_ops(utils::UUID ops_uuid) {
for (auto& x : _repairs) {
auto& ri = x.second;
if (ri->ops_uuid() && ri->ops_uuid().value() == ops_uuid) {
rlogger.info0("Aborted repair jobs for ops_uuid={}", ops_uuid);
ri->abort();
}
}
}
float tracker::report_progress(streaming::stream_reason reason) {
uint64_t nr_ranges_finished = 0;
uint64_t nr_ranges_total = 0;
@@ -542,7 +532,7 @@ repair_info::repair_info(repair_service& repair,
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ignore_nodes_,
streaming::stream_reason reason_,
std::optional<utils::UUID> ops_uuid,
shared_ptr<abort_source> as,
bool hints_batchlog_flushed)
: rs(repair)
, db(repair.get_db())
@@ -564,8 +554,10 @@ repair_info::repair_info(repair_service& repair,
, reason(reason_)
, total_rf(db.local().find_keyspace(keyspace).get_effective_replication_map()->get_replication_factor())
, nr_ranges_total(ranges.size())
, _ops_uuid(std::move(ops_uuid))
, _hints_batchlog_flushed(std::move(hints_batchlog_flushed)) {
if (as != nullptr) {
_abort_subscription = as->subscribe([this] () noexcept { abort(); });
}
}
void repair_info::check_failed_ranges() {
@@ -583,7 +575,7 @@ void repair_info::check_failed_ranges() {
}
}
void repair_info::abort() {
void repair_info::abort() noexcept {
aborted = true;
}
@@ -1199,7 +1191,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, id.uuid, hints_batchlog_flushed);
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, nullptr, hints_batchlog_flushed);
return repair_ranges(ri);
});
repair_results.push_back(std::move(f));
@@ -1266,12 +1258,12 @@ future<> repair_service::sync_data_using_repair(
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason,
std::optional<utils::UUID> ops_uuid) {
shared_ptr<node_ops_info> ops_info) {
if (ranges.empty()) {
return make_ready_future<>();
}
return container().invoke_on(0, [keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] (repair_service& local_repair) mutable {
return local_repair.do_sync_data_using_repair(std::move(keyspace), std::move(ranges), std::move(neighbors), reason, ops_uuid);
return container().invoke_on(0, [keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] (repair_service& local_repair) mutable {
return local_repair.do_sync_data_using_repair(std::move(keyspace), std::move(ranges), std::move(neighbors), reason, ops_info);
});
}
@@ -1280,12 +1272,12 @@ future<> repair_service::do_sync_data_using_repair(
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason,
std::optional<utils::UUID> ops_uuid) {
shared_ptr<node_ops_info> ops_info) {
seastar::sharded<replica::database>& db = get_db();
repair_uniq_id id = repair_tracker().next_repair_command();
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid, keyspace);
return repair_tracker().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable {
return repair_tracker().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
auto cfs = list_column_families(db.local(), keyspace);
if (cfs.empty()) {
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid, keyspace);
@@ -1298,14 +1290,14 @@ future<> repair_service::do_sync_data_using_repair(
throw std::runtime_error("aborted by user request");
}
for (auto shard : boost::irange(unsigned(0), smp::count)) {
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_uuid] (repair_service& local_repair) mutable {
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info] (repair_service& local_repair) mutable {
auto data_centers = std::vector<sstring>();
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
bool hints_batchlog_flushed = false;
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_uuid, hints_batchlog_flushed);
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_info ? ops_info->as : nullptr, hints_batchlog_flushed);
ri->neighbors = std::move(neighbors);
return repair_ranges(ri);
});
@@ -1504,7 +1496,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
}
}
auto nr_ranges = desired_ranges.size();
sync_data_using_repair(keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, {}).get();
sync_data_using_repair(keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get();
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
}
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", ks_erms | boost::adaptors::map_keys);
@@ -1695,8 +1687,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
ranges.swap(ranges_for_removenode);
}
auto nr_ranges_synced = ranges.size();
std::optional<utils::UUID> opt_uuid = ops ? std::make_optional<utils::UUID>(ops->ops_uuid) : std::nullopt;
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, opt_uuid).get();
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, ops).get();
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
}
@@ -1720,12 +1711,6 @@ future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmpt
});
}
future<> repair_service::abort_repair_node_ops(utils::UUID ops_uuid) {
return container().invoke_on_all([ops_uuid] (repair_service& rs) {
rs.repair_tracker().abort_repair_node_ops(ops_uuid);
});
}
future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::list<gms::inet_address> ignore_nodes) {
return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable {
seastar::sharded<replica::database>& db = get_db();
@@ -1802,7 +1787,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
}).get();
}
auto nr_ranges = ranges.size();
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, {}).get();
sync_data_using_repair(keyspace_name, std::move(ranges), std::move(range_sources), reason, nullptr).get();
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
}
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, ks_erms | boost::adaptors::map_keys, source_dc);

View File

@@ -69,7 +69,7 @@ std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
struct node_ops_info {
utils::UUID ops_uuid;
bool abort = false;
shared_ptr<abort_source> as;
std::list<gms::inet_address> ignore_nodes;
void check_abort();
};
@@ -167,7 +167,7 @@ public:
int ranges_index = 0;
repair_stats _stats;
std::unordered_set<sstring> dropped_tables;
std::optional<utils::UUID> _ops_uuid;
optimized_optional<abort_source::subscription> _abort_subscription;
bool _hints_batchlog_flushed = false;
public:
repair_info(repair_service& repair,
@@ -179,10 +179,10 @@ public:
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ingore_nodes_,
streaming::stream_reason reason_,
std::optional<utils::UUID> ops_uuid,
shared_ptr<abort_source> as,
bool hints_batchlog_flushed);
void check_failed_ranges();
void abort();
void abort() noexcept;
void check_in_abort();
void check_in_shutdown();
repair_neighbors get_repair_neighbors(const dht::token_range& range);
@@ -192,9 +192,6 @@ public:
const std::vector<sstring>& table_names() {
return cfs;
}
const std::optional<utils::UUID>& ops_uuid() const {
return _ops_uuid;
};
bool hints_batchlog_flushed() const {
return _hints_batchlog_flushed;
@@ -254,7 +251,6 @@ public:
future<> run(repair_uniq_id id, std::function<void ()> func);
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
float report_progress(streaming::stream_reason reason);
void abort_repair_node_ops(utils::UUID ops_uuid);
bool is_aborted(const utils::UUID& uuid);
};

View File

@@ -146,13 +146,13 @@ private:
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason,
std::optional<utils::UUID> ops_uuid);
shared_ptr<node_ops_info> ops_info);
future<> do_sync_data_using_repair(sstring keyspace,
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason,
std::optional<utils::UUID> ops_uuid);
shared_ptr<node_ops_info> ops_info);
future<repair_update_system_table_response> repair_update_system_table_handler(
gms::inet_address from,
@@ -198,8 +198,6 @@ public:
// Abort all the repairs
future<> abort_all();
future<> abort_repair_node_ops(utils::UUID ops_uuid);
std::unordered_map<node_repair_meta_id, repair_meta_ptr>& repair_meta_map() noexcept {
return _repair_metas;
}

View File

@@ -2524,8 +2524,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
}).get();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& node : req.leaving_nodes) {
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
@@ -2573,8 +2572,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
}).get();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& node : req.leaving_nodes) {
slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
@@ -2616,8 +2614,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return make_ready_future<>();
}).get();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& x: req.replace_nodes) {
auto existing_node = x.first;
@@ -2671,8 +2668,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
}).get();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& x: req.bootstrap_nodes) {
auto& endpoint = x.first;
@@ -2929,7 +2925,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
auto ops_uuid = utils::make_random_uuid();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::list<gms::inet_address>()});
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, nullptr, std::list<gms::inet_address>()});
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
return send_replication_notification(notify_endpoint);
});
@@ -3654,7 +3650,7 @@ bool storage_service::is_repair_based_node_ops_enabled(streaming::stream_reason
node_ops_meta_data::node_ops_meta_data(
utils::UUID ops_uuid,
gms::inet_address coordinator,
shared_ptr<node_ops_info> ops,
std::list<gms::inet_address> ignore_nodes,
std::function<future<> ()> abort_func,
std::function<void ()> signal_func)
: _ops_uuid(std::move(ops_uuid))
@@ -3662,24 +3658,20 @@ node_ops_meta_data::node_ops_meta_data(
, _abort(std::move(abort_func))
, _abort_source(seastar::make_shared<abort_source>())
, _signal(std::move(signal_func))
, _ops(std::move(ops))
, _ops(seastar::make_shared<node_ops_info>({_ops_uuid, _abort_source, std::move(ignore_nodes)}))
, _watchdog([sig = _signal] { sig(); }) {
_watchdog.arm(_watchdog_interval);
}
future<> node_ops_meta_data::abort() {
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
_aborted = true;
if (_ops) {
_ops->abort = true;
}
_watchdog.cancel();
return _abort();
}
void node_ops_meta_data::update_watchdog() {
slogger.debug("node_ops_meta_data: ops_uuid={} update_watchdog", _ops_uuid);
if (_aborted) {
if (_abort_source->abort_requested()) {
return;
}
_watchdog.cancel();
@@ -3731,7 +3723,6 @@ future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
if (as && !as->abort_requested()) {
as->request_abort();
}
co_await _repair.local().abort_repair_node_ops(ops_uuid);
_node_ops.erase(it);
}
}

View File

@@ -98,12 +98,11 @@ class node_ops_meta_data {
shared_ptr<node_ops_info> _ops;
seastar::timer<lowres_clock> _watchdog;
std::chrono::seconds _watchdog_interval{120};
bool _aborted = false;
public:
explicit node_ops_meta_data(
utils::UUID ops_uuid,
gms::inet_address coordinator,
shared_ptr<node_ops_info> ops,
std::list<gms::inet_address> ignore_nodes,
std::function<future<> ()> abort_func,
std::function<void ()> signal_func);
shared_ptr<node_ops_info> get_ops_info();