Merge 'storage_service, repair: use per-shard abort_source' from Benny Halevy

Prevent copying shared_ptr across shards
in do_sync_data_using_repair by allocating
a shared_ptr<abort_source> per shard in
node_ops_meta_data and respectively in node_ops_info.

Fixes #11826

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #11827

* github.com:scylladb/scylladb:
  repair: use sharded abort_source to abort repair_info
  repair: node_ops_info: add start and stop methods
  storage_service: node_ops_abort_thread: abort all node ops on shutdown
  storage_service: node_ops_abort_thread: co_return only after printing log message
  storage_service: node_ops_meta_data: add start and stop methods
  repair: node_ops_info: prevent accidental copy
This commit is contained in:
Botond Dénes
2022-10-31 09:43:34 +02:00
4 changed files with 92 additions and 11 deletions

View File

@@ -49,6 +49,12 @@
logging::logger rlogger("repair");
node_ops_info::node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept
: ops_uuid(ops_uuid_)
, as(std::move(as_))
, ignore_nodes(std::move(ignore_nodes_))
{}
void node_ops_info::check_abort() {
if (as && as->abort_requested()) {
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
@@ -57,6 +63,28 @@ void node_ops_info::check_abort() {
}
}
future<> node_ops_info::start() {
if (as) {
co_await _sas.start();
_abort_subscription = as->subscribe([this] () noexcept {
_abort_done = _sas.invoke_on_all([] (abort_source& as) noexcept {
as.request_abort();
});
});
}
}
future<> node_ops_info::stop() noexcept {
if (as) {
co_await std::exchange(_abort_done, make_ready_future<>());
co_await _sas.stop();
}
}
abort_source* node_ops_info::local_abort_source() {
return as ? &_sas.local() : nullptr;
}
node_ops_metrics::node_ops_metrics(tracker& tracker)
: _tracker(tracker)
{
@@ -532,7 +560,7 @@ repair_info::repair_info(repair_service& repair,
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ignore_nodes_,
streaming::stream_reason reason_,
shared_ptr<abort_source> as,
abort_source* as,
bool hints_batchlog_flushed)
: rs(repair)
, db(repair.get_db())
@@ -1295,9 +1323,10 @@ future<> repair_service::do_sync_data_using_repair(
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
bool hints_batchlog_flushed = false;
abort_source* asp = ops_info ? ops_info->local_abort_source() : nullptr;
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_info ? ops_info->as : nullptr, hints_batchlog_flushed);
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed);
ri->neighbors = std::move(neighbors);
return repair_ranges(ri);
});

View File

@@ -67,11 +67,28 @@ struct repair_uniq_id {
};
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
struct node_ops_info {
class node_ops_info {
public:
utils::UUID ops_uuid;
shared_ptr<abort_source> as;
std::list<gms::inet_address> ignore_nodes;
private:
optimized_optional<abort_source::subscription> _abort_subscription;
sharded<abort_source> _sas;
future<> _abort_done = make_ready_future<>();
public:
node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept;
node_ops_info(const node_ops_info&) = delete;
node_ops_info(node_ops_info&&) = delete;
future<> start();
future<> stop() noexcept;
void check_abort();
abort_source* local_abort_source();
};
// NOTE: repair_start() can be run on any node, but starts a node-global
@@ -179,7 +196,7 @@ public:
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ingore_nodes_,
streaming::stream_reason reason_,
shared_ptr<abort_source> as,
abort_source* as,
bool hints_batchlog_flushed);
void check_failed_ranges();
void abort() noexcept;

View File

@@ -2539,6 +2539,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
@@ -2587,6 +2588,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
@@ -2631,6 +2633,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
// Wait for local node has marked replacing node as alive
@@ -2685,6 +2688,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
@@ -2930,7 +2934,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
auto ops_uuid = utils::make_random_uuid();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, nullptr, std::list<gms::inet_address>()});
auto ops = seastar::make_shared<node_ops_info>(ops_uuid, nullptr, std::list<gms::inet_address>());
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
return send_replication_notification(notify_endpoint);
});
@@ -3663,11 +3667,19 @@ node_ops_meta_data::node_ops_meta_data(
, _abort(std::move(abort_func))
, _abort_source(seastar::make_shared<abort_source>())
, _signal(std::move(signal_func))
, _ops(seastar::make_shared<node_ops_info>({_ops_uuid, _abort_source, std::move(ignore_nodes)}))
, _ops(seastar::make_shared<node_ops_info>(_ops_uuid, _abort_source, std::move(ignore_nodes)))
, _watchdog([sig = _signal] { sig(); }) {
_watchdog.arm(_watchdog_interval);
}
future<> node_ops_meta_data::start() {
return _ops ? _ops->start() : make_ready_future<>();
}
future<> node_ops_meta_data::stop() noexcept {
return _ops ? _ops->stop() : make_ready_future<>();
}
future<> node_ops_meta_data::abort() {
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
_watchdog.cancel();
@@ -3713,6 +3725,7 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
meta.cancel_watchdog();
co_await meta.stop();
_node_ops.erase(it);
}
}
@@ -3720,6 +3733,24 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
if (!ops_uuid) {
for (auto& [uuid, meta] : _node_ops) {
co_await meta.abort();
auto as = meta.get_abort_source();
if (as && !as->abort_requested()) {
as->request_abort();
}
}
for (auto it = _node_ops.begin(); it != _node_ops.end(); it = _node_ops.erase(it)) {
node_ops_meta_data& meta = it->second;
co_await meta.stop();
}
co_return;
}
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3728,6 +3759,7 @@ future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
if (as && !as->abort_requested()) {
as->request_abort();
}
co_await meta.stop();
_node_ops.erase(it);
}
}
@@ -3746,17 +3778,18 @@ future<> storage_service::node_ops_abort_thread() {
while (!_node_ops_abort_queue.empty()) {
auto uuid_opt = _node_ops_abort_queue.front();
_node_ops_abort_queue.pop_front();
if (!uuid_opt) {
co_return;
}
try {
co_await node_ops_abort(*uuid_opt);
co_await node_ops_abort(uuid_opt.value_or(utils::null_uuid()));
} catch (...) {
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
}
if (!uuid_opt) {
slogger.info("Stopped node_ops_abort_thread");
co_return;
}
}
}
slogger.info("Stopped node_ops_abort_thread");
__builtin_unreachable();
}
} // namespace service

View File

@@ -105,6 +105,8 @@ public:
std::list<gms::inet_address> ignore_nodes,
std::function<future<> ()> abort_func,
std::function<void ()> signal_func);
future<> start();
future<> stop() noexcept;
shared_ptr<node_ops_info> get_ops_info();
shared_ptr<abort_source> get_abort_source();
future<> abort();