Merge 'storage_service, repair: use per-shard abort_source' from Benny Halevy
Prevent copying shared_ptr across shards in do_sync_data_using_repair by allocating a shared_ptr<abort_source> per shard in node_ops_meta_data and respectively in node_ops_info. Fixes #11826 Signed-off-by: Benny Halevy <bhalevy@scylladb.com> Closes #11827 * github.com:scylladb/scylladb: repair: use sharded abort_source to abort repair_info repair: node_ops_info: add start and stop methods storage_service: node_ops_abort_thread: abort all node ops on shutdown storage_service: node_ops_abort_thread: co_return only after printing log message storage_service: node_ops_meta_data: add start and stop methods repair: node_ops_info: prevent accidental copy
This commit is contained in:
@@ -49,6 +49,12 @@
|
||||
|
||||
logging::logger rlogger("repair");
|
||||
|
||||
node_ops_info::node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept
|
||||
: ops_uuid(ops_uuid_)
|
||||
, as(std::move(as_))
|
||||
, ignore_nodes(std::move(ignore_nodes_))
|
||||
{}
|
||||
|
||||
void node_ops_info::check_abort() {
|
||||
if (as && as->abort_requested()) {
|
||||
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
|
||||
@@ -57,6 +63,28 @@ void node_ops_info::check_abort() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> node_ops_info::start() {
|
||||
if (as) {
|
||||
co_await _sas.start();
|
||||
_abort_subscription = as->subscribe([this] () noexcept {
|
||||
_abort_done = _sas.invoke_on_all([] (abort_source& as) noexcept {
|
||||
as.request_abort();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<> node_ops_info::stop() noexcept {
|
||||
if (as) {
|
||||
co_await std::exchange(_abort_done, make_ready_future<>());
|
||||
co_await _sas.stop();
|
||||
}
|
||||
}
|
||||
|
||||
abort_source* node_ops_info::local_abort_source() {
|
||||
return as ? &_sas.local() : nullptr;
|
||||
}
|
||||
|
||||
node_ops_metrics::node_ops_metrics(tracker& tracker)
|
||||
: _tracker(tracker)
|
||||
{
|
||||
@@ -532,7 +560,7 @@ repair_info::repair_info(repair_service& repair,
|
||||
const std::vector<sstring>& hosts_,
|
||||
const std::unordered_set<gms::inet_address>& ignore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
shared_ptr<abort_source> as,
|
||||
abort_source* as,
|
||||
bool hints_batchlog_flushed)
|
||||
: rs(repair)
|
||||
, db(repair.get_db())
|
||||
@@ -1295,9 +1323,10 @@ future<> repair_service::do_sync_data_using_repair(
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ignore_nodes = std::unordered_set<gms::inet_address>();
|
||||
bool hints_batchlog_flushed = false;
|
||||
abort_source* asp = ops_info ? ops_info->local_abort_source() : nullptr;
|
||||
auto ri = make_lw_shared<repair_info>(local_repair,
|
||||
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_info ? ops_info->as : nullptr, hints_batchlog_flushed);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed);
|
||||
ri->neighbors = std::move(neighbors);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
|
||||
@@ -67,11 +67,28 @@ struct repair_uniq_id {
|
||||
};
|
||||
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
|
||||
|
||||
struct node_ops_info {
|
||||
class node_ops_info {
|
||||
public:
|
||||
utils::UUID ops_uuid;
|
||||
shared_ptr<abort_source> as;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
|
||||
private:
|
||||
optimized_optional<abort_source::subscription> _abort_subscription;
|
||||
sharded<abort_source> _sas;
|
||||
future<> _abort_done = make_ready_future<>();
|
||||
|
||||
public:
|
||||
node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept;
|
||||
node_ops_info(const node_ops_info&) = delete;
|
||||
node_ops_info(node_ops_info&&) = delete;
|
||||
|
||||
future<> start();
|
||||
future<> stop() noexcept;
|
||||
|
||||
void check_abort();
|
||||
|
||||
abort_source* local_abort_source();
|
||||
};
|
||||
|
||||
// NOTE: repair_start() can be run on any node, but starts a node-global
|
||||
@@ -179,7 +196,7 @@ public:
|
||||
const std::vector<sstring>& hosts_,
|
||||
const std::unordered_set<gms::inet_address>& ingore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
shared_ptr<abort_source> as,
|
||||
abort_source* as,
|
||||
bool hints_batchlog_flushed);
|
||||
void check_failed_ranges();
|
||||
void abort() noexcept;
|
||||
|
||||
@@ -2539,6 +2539,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
});
|
||||
},
|
||||
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
|
||||
meta.start().get();
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
@@ -2587,6 +2588,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
});
|
||||
},
|
||||
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
|
||||
meta.start().get();
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
|
||||
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
@@ -2631,6 +2633,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
});
|
||||
},
|
||||
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
|
||||
meta.start().get();
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
|
||||
// Wait for local node has marked replacing node as alive
|
||||
@@ -2685,6 +2688,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
});
|
||||
},
|
||||
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
|
||||
meta.start().get();
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
|
||||
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
@@ -2930,7 +2934,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
|
||||
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
|
||||
auto ops_uuid = utils::make_random_uuid();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, nullptr, std::list<gms::inet_address>()});
|
||||
auto ops = seastar::make_shared<node_ops_info>(ops_uuid, nullptr, std::list<gms::inet_address>());
|
||||
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
|
||||
return send_replication_notification(notify_endpoint);
|
||||
});
|
||||
@@ -3663,11 +3667,19 @@ node_ops_meta_data::node_ops_meta_data(
|
||||
, _abort(std::move(abort_func))
|
||||
, _abort_source(seastar::make_shared<abort_source>())
|
||||
, _signal(std::move(signal_func))
|
||||
, _ops(seastar::make_shared<node_ops_info>({_ops_uuid, _abort_source, std::move(ignore_nodes)}))
|
||||
, _ops(seastar::make_shared<node_ops_info>(_ops_uuid, _abort_source, std::move(ignore_nodes)))
|
||||
, _watchdog([sig = _signal] { sig(); }) {
|
||||
_watchdog.arm(_watchdog_interval);
|
||||
}
|
||||
|
||||
future<> node_ops_meta_data::start() {
|
||||
return _ops ? _ops->start() : make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> node_ops_meta_data::stop() noexcept {
|
||||
return _ops ? _ops->stop() : make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> node_ops_meta_data::abort() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
|
||||
_watchdog.cancel();
|
||||
@@ -3713,6 +3725,7 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.cancel_watchdog();
|
||||
co_await meta.stop();
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
@@ -3720,6 +3733,24 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
|
||||
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
|
||||
|
||||
if (!ops_uuid) {
|
||||
for (auto& [uuid, meta] : _node_ops) {
|
||||
co_await meta.abort();
|
||||
auto as = meta.get_abort_source();
|
||||
if (as && !as->abort_requested()) {
|
||||
as->request_abort();
|
||||
}
|
||||
}
|
||||
|
||||
for (auto it = _node_ops.begin(); it != _node_ops.end(); it = _node_ops.erase(it)) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
co_await meta.stop();
|
||||
}
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
@@ -3728,6 +3759,7 @@ future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
if (as && !as->abort_requested()) {
|
||||
as->request_abort();
|
||||
}
|
||||
co_await meta.stop();
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
@@ -3746,17 +3778,18 @@ future<> storage_service::node_ops_abort_thread() {
|
||||
while (!_node_ops_abort_queue.empty()) {
|
||||
auto uuid_opt = _node_ops_abort_queue.front();
|
||||
_node_ops_abort_queue.pop_front();
|
||||
if (!uuid_opt) {
|
||||
co_return;
|
||||
}
|
||||
try {
|
||||
co_await node_ops_abort(*uuid_opt);
|
||||
co_await node_ops_abort(uuid_opt.value_or(utils::null_uuid()));
|
||||
} catch (...) {
|
||||
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
|
||||
}
|
||||
if (!uuid_opt) {
|
||||
slogger.info("Stopped node_ops_abort_thread");
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
}
|
||||
slogger.info("Stopped node_ops_abort_thread");
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -105,6 +105,8 @@ public:
|
||||
std::list<gms::inet_address> ignore_nodes,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func);
|
||||
future<> start();
|
||||
future<> stop() noexcept;
|
||||
shared_ptr<node_ops_info> get_ops_info();
|
||||
shared_ptr<abort_source> get_abort_source();
|
||||
future<> abort();
|
||||
|
||||
Reference in New Issue
Block a user