repair: Make removenode safe by default
Currently removenode works like below: - The coordinator node advertises the node to be removed in REMOVING_TOKEN status in gossip - Existing nodes learn the node in REMOVING_TOKEN status - Existing nodes sync data for the range it owns - Existing nodes send notification to the coordinator - The coordinator node waits for notification and announce the node in REMOVED_TOKEN Current problems: - Existing nodes do not tell the coordinator if the data sync is ok or failed. - The coordinator can not abort the removenode operation in case of error - Failed removenode operation will make the node to be removed in REMOVING_TOKEN forever. - The removenode runs in best effort mode which may cause data consistency issues. It means if a node that owns the range after the removenode operation is down during the operation, the removenode node operation will continue to succeed without requiring that node to perform data syncing. This can cause data consistency issues. For example, Five nodes in the cluster, RF = 3, for a range, n1, n2, n3 is the old replicas, n2 is being removed, after the removenode operation, the new replicas are n1, n5, n3. If n3 is down during the removenode operation, only n1 will be used to sync data with the new owner n5. This will break QUORUM read consistency if n1 happens to miss some writes. Improvements in this patch: - This patch makes the removenode safe by default. We require all nodes in the cluster to participate in the removenode operation and sync data if needed. We fail the removenode operation if any of them is down or fails. If the user want the removenode operation to succeed even if some of the nodes are not available, the user has to explicitly pass a list of nodes that can be skipped for the operation. $ nodetool removenode --ignore-dead-nodes <list_of_dead_nodes_to_ignore> <host_id> Example restful api: $ curl -X POST "http://127.0.0.1:10000/storage_service/remove_node/?host_id=7bd303e9-4c7b-4915-84f6-343d0dbd9a49&ignore_nodes=127.0.0.3,127.0.0.5" - The coordinator can abort data sync on existing nodes For example, if one of the nodes fails to sync data. It makes no sense for other nodes to continue to sync data because the whole operation will fail anyway. - The coordinator can decide which nodes to ignore and pass the decision to other nodes Previously, there is no way for the coordinator to tell existing nodes to run in strict mode or best effort mode. Users will have to modify config file or run a restful api cmd on all the nodes to select strict or best effort mode. With this patch, the cluster wide configuration is eliminated. Fixes #7359 Closes #7626
This commit is contained in:
@@ -1105,6 +1105,14 @@
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"ignore_nodes",
|
||||
"description":"List of dead nodes to ingore in removenode operation",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include <time.h>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/adaptor/filtered.hpp>
|
||||
#include <boost/algorithm/string/trim_all.hpp>
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/load_meter.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
@@ -496,7 +497,22 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
|
||||
ss::remove_node.set(r, [](std::unique_ptr<request> req) {
|
||||
auto host_id = req->get_query_param("host_id");
|
||||
return service::get_local_storage_service().removenode(host_id).then([] {
|
||||
std::vector<sstring> ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ",");
|
||||
auto ignore_nodes = std::list<gms::inet_address>();
|
||||
for (std::string n : ignore_nodes_strs) {
|
||||
try {
|
||||
std::replace(n.begin(), n.end(), '\"', ' ');
|
||||
std::replace(n.begin(), n.end(), '\'', ' ');
|
||||
boost::trim_all(n);
|
||||
if (!n.empty()) {
|
||||
auto node = gms::inet_address(n);
|
||||
ignore_nodes.push_back(node);
|
||||
}
|
||||
} catch (...) {
|
||||
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
|
||||
}
|
||||
}
|
||||
return service::get_local_storage_service().removenode(host_id, std::move(ignore_nodes)).then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
});
|
||||
|
||||
@@ -103,3 +103,22 @@ enum class repair_row_level_start_status: uint8_t {
|
||||
struct repair_row_level_start_response {
|
||||
repair_row_level_start_status status;
|
||||
};
|
||||
|
||||
enum class node_ops_cmd : uint32_t {
|
||||
removenode_prepare,
|
||||
removenode_heartbeat,
|
||||
removenode_sync_data,
|
||||
removenode_abort,
|
||||
removenode_done,
|
||||
};
|
||||
|
||||
struct node_ops_cmd_request {
|
||||
node_ops_cmd cmd;
|
||||
utils::UUID ops_uuid;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
std::list<gms::inet_address> leaving_nodes;
|
||||
};
|
||||
|
||||
struct node_ops_cmd_response {
|
||||
bool ok;
|
||||
};
|
||||
|
||||
@@ -332,6 +332,7 @@ public:
|
||||
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
|
||||
|
||||
void add_leaving_endpoint(inet_address endpoint);
|
||||
void del_leaving_endpoint(inet_address endpoint);
|
||||
public:
|
||||
void remove_endpoint(inet_address endpoint);
|
||||
#if 0
|
||||
@@ -1546,6 +1547,10 @@ void token_metadata_impl::add_leaving_endpoint(inet_address endpoint) {
|
||||
_leaving_endpoints.emplace(endpoint);
|
||||
}
|
||||
|
||||
void token_metadata_impl::del_leaving_endpoint(inet_address endpoint) {
|
||||
_leaving_endpoints.erase(endpoint);
|
||||
}
|
||||
|
||||
void token_metadata_impl::add_replacing_endpoint(inet_address existing_node, inet_address replacing_node) {
|
||||
tlogger.info("Added node {} as pending replacing endpoint which replaces existing node {}",
|
||||
replacing_node, existing_node);
|
||||
@@ -1806,6 +1811,11 @@ token_metadata::add_leaving_endpoint(inet_address endpoint) {
|
||||
_impl->add_leaving_endpoint(endpoint);
|
||||
}
|
||||
|
||||
void
|
||||
token_metadata::del_leaving_endpoint(inet_address endpoint) {
|
||||
_impl->del_leaving_endpoint(endpoint);
|
||||
}
|
||||
|
||||
void
|
||||
token_metadata::remove_endpoint(inet_address endpoint) {
|
||||
_impl->remove_endpoint(endpoint);
|
||||
|
||||
@@ -236,6 +236,7 @@ public:
|
||||
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
|
||||
|
||||
void add_leaving_endpoint(inet_address endpoint);
|
||||
void del_leaving_endpoint(inet_address endpoint);
|
||||
|
||||
void remove_endpoint(inet_address endpoint);
|
||||
|
||||
|
||||
@@ -504,6 +504,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM:
|
||||
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
|
||||
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
|
||||
case messaging_verb::NODE_OPS_CMD:
|
||||
case messaging_verb::HINT_MUTATION:
|
||||
return 1;
|
||||
case messaging_verb::CLIENT_ID:
|
||||
@@ -1349,6 +1350,17 @@ future<std::vector<row_level_diff_detect_algorithm>> messaging_service::send_rep
|
||||
return send_message<future<std::vector<row_level_diff_detect_algorithm>>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id));
|
||||
}
|
||||
|
||||
// Wrapper for NODE_OPS_CMD
|
||||
void messaging_service::register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func) {
|
||||
register_handler(this, messaging_verb::NODE_OPS_CMD, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_node_ops_cmd() {
|
||||
return unregister_handler(messaging_verb::NODE_OPS_CMD);
|
||||
}
|
||||
future<node_ops_cmd_response> messaging_service::send_node_ops_cmd(msg_addr id, node_ops_cmd_request req) {
|
||||
return send_message<future<node_ops_cmd_response>>(this, messaging_verb::NODE_OPS_CMD, std::move(id), std::move(req));
|
||||
}
|
||||
|
||||
void
|
||||
messaging_service::register_paxos_prepare(std::function<future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>>(
|
||||
const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot,
|
||||
|
||||
@@ -143,7 +143,8 @@ enum class messaging_verb : int32_t {
|
||||
HINT_MUTATION = 42,
|
||||
PAXOS_PRUNE = 43,
|
||||
GOSSIP_GET_ENDPOINT_STATES = 44,
|
||||
LAST = 45,
|
||||
NODE_OPS_CMD = 45,
|
||||
LAST = 46,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
@@ -394,6 +395,11 @@ public:
|
||||
future<> unregister_repair_get_diff_algorithms();
|
||||
future<std::vector<row_level_diff_detect_algorithm>> send_repair_get_diff_algorithms(msg_addr id);
|
||||
|
||||
// Wrapper for NODE_OPS_CMD
|
||||
void register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func);
|
||||
future<> unregister_node_ops_cmd();
|
||||
future<node_ops_cmd_response> send_node_ops_cmd(msg_addr id, node_ops_cmd_request);
|
||||
|
||||
// Wrapper for GOSSIP_ECHO verb
|
||||
void register_gossip_echo(std::function<future<> ()>&& func);
|
||||
future<> unregister_gossip_echo();
|
||||
|
||||
@@ -53,6 +53,14 @@ logging::logger rlogger("repair");
|
||||
|
||||
static sharded<netw::messaging_service>* _messaging;
|
||||
|
||||
void node_ops_info::check_abort() {
|
||||
if (abort) {
|
||||
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
|
||||
rlogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
}
|
||||
|
||||
class node_ops_metrics {
|
||||
public:
|
||||
node_ops_metrics() {
|
||||
@@ -434,6 +442,16 @@ void tracker::abort_all_repairs() {
|
||||
rlogger.info0("Aborted {} repair job(s)", count);
|
||||
}
|
||||
|
||||
void tracker::abort_repair_node_ops(utils::UUID ops_uuid) {
|
||||
for (auto& x : _repairs[this_shard_id()]) {
|
||||
auto& ri = x.second;
|
||||
if (ri->ops_uuid() && ri->ops_uuid().value() == ops_uuid) {
|
||||
rlogger.info0("Aborted repair jobs for ops_uuid={}", ops_uuid);
|
||||
ri->abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
float tracker::report_progress(streaming::stream_reason reason) {
|
||||
uint64_t nr_ranges_finished = 0;
|
||||
uint64_t nr_ranges_total = 0;
|
||||
@@ -792,7 +810,8 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
||||
repair_uniq_id id_,
|
||||
const std::vector<sstring>& data_centers_,
|
||||
const std::vector<sstring>& hosts_,
|
||||
streaming::stream_reason reason_)
|
||||
streaming::stream_reason reason_,
|
||||
std::optional<utils::UUID> ops_uuid)
|
||||
: db(db_)
|
||||
, messaging(ms_)
|
||||
, sharder(get_sharder_for_tables(db_, keyspace_, table_ids_))
|
||||
@@ -806,7 +825,8 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
||||
, hosts(hosts_)
|
||||
, reason(reason_)
|
||||
, nr_ranges_total(ranges.size())
|
||||
, _row_level_repair(db.local().features().cluster_supports_row_level_repair()) {
|
||||
, _row_level_repair(db.local().features().cluster_supports_row_level_repair())
|
||||
, _ops_uuid(std::move(ops_uuid)) {
|
||||
}
|
||||
|
||||
future<> repair_info::do_streaming() {
|
||||
@@ -1625,7 +1645,7 @@ static int do_repair_start(seastar::sharded<database>& db, seastar::sharded<netw
|
||||
_node_ops_metrics.repair_total_ranges_sum += ranges.size();
|
||||
auto ri = make_lw_shared<repair_info>(db, ms,
|
||||
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair);
|
||||
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair, id.uuid);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
repair_results.push_back(std::move(f));
|
||||
@@ -1695,14 +1715,15 @@ static future<> sync_data_using_repair(seastar::sharded<database>& db,
|
||||
sstring keyspace,
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||
streaming::stream_reason reason) {
|
||||
streaming::stream_reason reason,
|
||||
std::optional<utils::UUID> ops_uuid) {
|
||||
if (ranges.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return smp::submit_to(0, [&db, &ms, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
|
||||
return smp::submit_to(0, [&db, &ms, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable {
|
||||
repair_uniq_id id = repair_tracker().next_repair_command();
|
||||
rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace);
|
||||
return repair_tracker().run(id, [id, &db, &ms, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
|
||||
return repair_tracker().run(id, [id, &db, &ms, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable {
|
||||
auto cfs = list_column_families(db.local(), keyspace);
|
||||
if (cfs.empty()) {
|
||||
rlogger.warn("repair id {} to sync data for keyspace={}, no table in this keyspace", id, keyspace);
|
||||
@@ -1712,12 +1733,12 @@ static future<> sync_data_using_repair(seastar::sharded<database>& db,
|
||||
std::vector<future<>> repair_results;
|
||||
repair_results.reserve(smp::count);
|
||||
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
||||
auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges, neighbors, reason] (database& localdb) mutable {
|
||||
auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges, neighbors, reason, ops_uuid] (database& localdb) mutable {
|
||||
auto data_centers = std::vector<sstring>();
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ri = make_lw_shared<repair_info>(db, ms,
|
||||
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), reason);
|
||||
id, std::move(data_centers), std::move(hosts), reason, ops_uuid);
|
||||
ri->neighbors = std::move(neighbors);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
@@ -1910,16 +1931,16 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
|
||||
}
|
||||
}
|
||||
auto nr_ranges = desired_ranges.size();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason).get();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, {}).get();
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
|
||||
}
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces);
|
||||
});
|
||||
}
|
||||
|
||||
static future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node) {
|
||||
static future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
using inet_address = gms::inet_address;
|
||||
return seastar::async([&db, &ms, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node)] () mutable {
|
||||
return seastar::async([&db, &ms, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable {
|
||||
auto myip = utils::fb_utilities::get_broadcast_address();
|
||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||
bool is_removenode = myip != leaving_node;
|
||||
@@ -1978,6 +1999,9 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
auto local_dc = get_local_dc();
|
||||
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
|
||||
for (auto&r : ranges) {
|
||||
if (ops) {
|
||||
ops->check_abort();
|
||||
}
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
const std::vector<inet_address> new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes);
|
||||
const std::vector<inet_address>& current_eps = current_replica_endpoints[r];
|
||||
@@ -2059,6 +2083,12 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
}
|
||||
neighbors_set.erase(myip);
|
||||
neighbors_set.erase(leaving_node);
|
||||
// Remove nodes in ignore_nodes
|
||||
if (ops) {
|
||||
for (const auto& node : ops->ignore_nodes) {
|
||||
neighbors_set.erase(node);
|
||||
}
|
||||
}
|
||||
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors_set |
|
||||
boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) {
|
||||
return snitch_ptr->get_datacenter(node) == local_dc;
|
||||
@@ -2070,9 +2100,10 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, skipped",
|
||||
op, keyspace_name, r, current_eps, new_eps, neighbors);
|
||||
} else {
|
||||
rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}",
|
||||
op, keyspace_name, r, current_eps, new_eps, neighbors);
|
||||
range_sources[r] = repair_neighbors(std::move(neighbors));
|
||||
std::vector<gms::inet_address> mandatory_neighbors = is_removenode ? neighbors : std::vector<gms::inet_address>{};
|
||||
rlogger.info("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, mandatory_neighbor={}",
|
||||
op, keyspace_name, r, current_eps, new_eps, neighbors, mandatory_neighbors);
|
||||
range_sources[r] = repair_neighbors(std::move(neighbors), std::move(mandatory_neighbors));
|
||||
if (is_removenode) {
|
||||
ranges_for_removenode.push_back(r);
|
||||
}
|
||||
@@ -2091,7 +2122,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
ranges.swap(ranges_for_removenode);
|
||||
}
|
||||
auto nr_ranges_synced = ranges.size();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason, ops->ops_uuid).get();
|
||||
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
|
||||
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
|
||||
}
|
||||
@@ -2100,11 +2131,17 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
}
|
||||
|
||||
future<> decommission_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr) {
|
||||
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), utils::fb_utilities::get_broadcast_address());
|
||||
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), utils::fb_utilities::get_broadcast_address(), {});
|
||||
}
|
||||
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node) {
|
||||
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), std::move(leaving_node));
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), std::move(leaving_node), std::move(ops));
|
||||
}
|
||||
|
||||
future<> abort_repair_node_ops(utils::UUID ops_uuid) {
|
||||
return smp::invoke_on_all([ops_uuid] {
|
||||
return repair_tracker().abort_repair_node_ops(ops_uuid);
|
||||
});
|
||||
}
|
||||
|
||||
static future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason) {
|
||||
@@ -2179,7 +2216,7 @@ static future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, s
|
||||
}).get();
|
||||
}
|
||||
auto nr_ranges = ranges.size();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason, {}).get();
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
||||
@@ -2218,12 +2255,19 @@ static future<> init_messaging_service_handler(sharded<database>& db, sharded<ne
|
||||
return checksum_range(db, keyspace, cf, range, hv);
|
||||
});
|
||||
});
|
||||
ms.register_node_ops_cmd([] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return smp::submit_to(src_cpu_id % smp::count, [coordinator, req = std::move(req)] () mutable {
|
||||
return service::get_local_storage_service().node_ops_cmd_handler(coordinator, std::move(req));
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> uninit_messaging_service_handler() {
|
||||
return _messaging->invoke_on_all([] (auto& ms) {
|
||||
return ms.unregister_repair_checksum_range();
|
||||
return when_all_succeed(ms.unregister_repair_checksum_range(), ms.unregister_node_ops_cmd()).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -76,13 +76,22 @@ struct repair_uniq_id {
|
||||
};
|
||||
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
|
||||
|
||||
struct node_ops_info {
|
||||
utils::UUID ops_uuid;
|
||||
bool abort = false;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
void check_abort();
|
||||
};
|
||||
|
||||
// The tokens are the tokens assigned to the bootstrap node.
|
||||
future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
|
||||
future<> decommission_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr);
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node);
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> rebuild_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, sstring source_dc);
|
||||
future<> replace_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens);
|
||||
|
||||
future<> abort_repair_node_ops(utils::UUID ops_uuid);
|
||||
|
||||
// NOTE: repair_start() can be run on any node, but starts a node-global
|
||||
// operation.
|
||||
// repair_start() starts the requested repair on this node. It returns an
|
||||
@@ -244,6 +253,7 @@ public:
|
||||
bool _row_level_repair;
|
||||
uint64_t _sub_ranges_nr = 0;
|
||||
std::unordered_set<sstring> dropped_tables;
|
||||
std::optional<utils::UUID> _ops_uuid;
|
||||
public:
|
||||
repair_info(seastar::sharded<database>& db_,
|
||||
seastar::sharded<netw::messaging_service>& ms_,
|
||||
@@ -253,7 +263,8 @@ public:
|
||||
repair_uniq_id id_,
|
||||
const std::vector<sstring>& data_centers_,
|
||||
const std::vector<sstring>& hosts_,
|
||||
streaming::stream_reason reason_);
|
||||
streaming::stream_reason reason_,
|
||||
std::optional<utils::UUID> ops_uuid);
|
||||
future<> do_streaming();
|
||||
void check_failed_ranges();
|
||||
future<> request_transfer_ranges(const sstring& cf,
|
||||
@@ -272,6 +283,9 @@ public:
|
||||
const std::vector<sstring>& table_names() {
|
||||
return cfs;
|
||||
}
|
||||
const std::optional<utils::UUID>& ops_uuid() const {
|
||||
return _ops_uuid;
|
||||
};
|
||||
};
|
||||
|
||||
// The repair_tracker tracks ongoing repair operations and their progress.
|
||||
@@ -324,6 +338,7 @@ public:
|
||||
future<> run(repair_uniq_id id, std::function<void ()> func);
|
||||
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
|
||||
float report_progress(streaming::stream_reason reason);
|
||||
void abort_repair_node_ops(utils::UUID ops_uuid);
|
||||
};
|
||||
|
||||
future<uint64_t> estimate_partitions(seastar::sharded<database>& db, const sstring& keyspace,
|
||||
@@ -464,6 +479,27 @@ enum class row_level_diff_detect_algorithm : uint8_t {
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo);
|
||||
|
||||
enum class node_ops_cmd : uint32_t {
|
||||
removenode_prepare,
|
||||
removenode_heartbeat,
|
||||
removenode_sync_data,
|
||||
removenode_abort,
|
||||
removenode_done,
|
||||
};
|
||||
|
||||
// The cmd and ops_uuid are mandatory for each request.
|
||||
// The ignore_nodes and leaving_node are optional.
|
||||
struct node_ops_cmd_request {
|
||||
node_ops_cmd cmd;
|
||||
utils::UUID ops_uuid;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
std::list<gms::inet_address> leaving_nodes;
|
||||
};
|
||||
|
||||
struct node_ops_cmd_response {
|
||||
bool ok;
|
||||
};
|
||||
|
||||
namespace std {
|
||||
template<>
|
||||
struct hash<partition_checksum> {
|
||||
|
||||
@@ -107,6 +107,7 @@ storage_service::storage_service(abort_source& abort_source, distributed<databas
|
||||
, _service_memory_total(config.available_memory / 10)
|
||||
, _service_memory_limiter(_service_memory_total)
|
||||
, _for_testing(for_testing)
|
||||
, _node_ops_abort_thread(node_ops_abort_thread())
|
||||
, _shared_token_metadata(stm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _view_update_generator(view_update_generator)
|
||||
@@ -1742,9 +1743,12 @@ future<> storage_service::gossip_sharder() {
|
||||
|
||||
future<> storage_service::stop() {
|
||||
// make sure nobody uses the semaphore
|
||||
node_ops_singal_abort(std::nullopt);
|
||||
return _service_memory_limiter.wait(_service_memory_total).finally([this] {
|
||||
_listeners.clear();
|
||||
return _schema_version_publisher.join();
|
||||
}).finally([this] {
|
||||
return std::move(_node_ops_abort_thread);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2163,102 +2167,192 @@ future<> storage_service::decommission() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::removenode(sstring host_id_string) {
|
||||
return run_with_api_lock(sstring("removenode"), [host_id_string] (storage_service& ss) mutable {
|
||||
return seastar::async([&ss, host_id_string] {
|
||||
slogger.debug("removenode: host_id = {}", host_id_string);
|
||||
auto my_address = ss.get_broadcast_address();
|
||||
auto tmlock = std::make_unique<token_metadata_lock>(ss.get_token_metadata_lock().get0());
|
||||
auto tmptr = ss.get_mutable_token_metadata_ptr().get0();
|
||||
auto local_host_id = tmptr->get_host_id(my_address);
|
||||
future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes) {
|
||||
return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable {
|
||||
return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] {
|
||||
auto uuid = utils::make_random_uuid();
|
||||
auto tmptr = ss.get_token_metadata_ptr();
|
||||
auto host_id = utils::UUID(host_id_string);
|
||||
auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id);
|
||||
if (!endpoint_opt) {
|
||||
throw std::runtime_error("Host ID not found.");
|
||||
throw std::runtime_error(format("removenode[{}]: Host ID not found in the cluster", uuid));
|
||||
}
|
||||
auto endpoint = *endpoint_opt;
|
||||
|
||||
auto tokens = tmptr->get_tokens(endpoint);
|
||||
auto leaving_nodes = std::list<gms::inet_address>{endpoint};
|
||||
|
||||
slogger.debug("removenode: endpoint = {}", endpoint);
|
||||
future<> heartbeat_updater = make_ready_future<>();
|
||||
auto heartbeat_updater_done = make_lw_shared<bool>(false);
|
||||
|
||||
if (endpoint == my_address) {
|
||||
throw std::runtime_error("Cannot remove self");
|
||||
// Step 1: Decide who needs to sync data
|
||||
//
|
||||
// By default, we require all nodes in the cluster to participate
|
||||
// the removenode operation and sync data if needed. We fail the
|
||||
// removenode operation if any of them is down or fails.
|
||||
//
|
||||
// If the user want the removenode opeartion to succeed even if some of the nodes
|
||||
// are not available, the user has to explicitly pass a list of
|
||||
// node that can be skipped for the operation.
|
||||
std::vector<gms::inet_address> nodes;
|
||||
for (const auto& x : tmptr->get_endpoint_to_host_id_map_for_reading()) {
|
||||
seastar::thread::maybe_yield();
|
||||
if (x.first != endpoint && std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) {
|
||||
nodes.push_back(x.first);
|
||||
}
|
||||
}
|
||||
slogger.info("removenode[{}]: Started removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
|
||||
|
||||
if (ss._gossiper.get_live_members().contains(endpoint)) {
|
||||
throw std::runtime_error(format("Node {} is alive and owns this ID. Use decommission command to remove it from the ring", endpoint));
|
||||
}
|
||||
|
||||
// A leaving endpoint that is dead is already being removed.
|
||||
if (tmptr->is_leaving(endpoint)) {
|
||||
slogger.warn("Node {} is already being removed, continuing removal anyway", endpoint);
|
||||
}
|
||||
|
||||
if (!ss._replicating_nodes.empty()) {
|
||||
throw std::runtime_error("This node is already processing a removal. Wait for it to complete, or use 'removenode force' if this has failed.");
|
||||
}
|
||||
|
||||
auto non_system_keyspaces = ss.db().local().get_non_system_keyspaces();
|
||||
// Find the endpoints that are going to become responsible for data
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
auto& ks = ss.db().local().find_keyspace(keyspace_name);
|
||||
// if the replication factor is 1 the data is lost so we shouldn't wait for confirmation
|
||||
if (ks.get_replication_strategy().get_replication_factor() == 1) {
|
||||
slogger.warn("keyspace={} has replication factor 1, the data is probably lost", keyspace_name);
|
||||
continue;
|
||||
// Step 2: Prepare to sync data
|
||||
std::unordered_set<gms::inet_address> nodes_unknown_verb;
|
||||
std::unordered_set<gms::inet_address> nodes_down;
|
||||
auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes};
|
||||
try {
|
||||
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got prepare response from node={}", uuid, node);
|
||||
}).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) {
|
||||
slogger.warn("removenode[{}]: Node {} does not support removenode verb", uuid, node);
|
||||
nodes_unknown_verb.emplace(node);
|
||||
}).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) {
|
||||
slogger.warn("removenode[{}]: Node {} is down for node_ops_cmd verb", uuid, node);
|
||||
nodes_down.emplace(node);
|
||||
});
|
||||
}).get();
|
||||
if (!nodes_unknown_verb.empty()) {
|
||||
auto msg = format("removenode[{}]: Nodes={} do not support removenode verb. Please upgrade your cluster and run removenode again.", uuid, nodes_unknown_verb);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
auto msg = format("removenode[{}]: Nodes={} needed for removenode operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, run nodetool removenode --ignore-dead-nodes <list_of_dead_nodes> <host_id>. E.g., nodetool removenode --ignore-dead-nodes 127.0.0.1,127.0.0.2 817e9515-316f-4fe3-aaab-b00d6f12dddd", uuid, nodes_down);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
// get all ranges that change ownership (that is, a node needs
|
||||
// to take responsibility for new range)
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges =
|
||||
ss.get_changed_ranges_for_leaving(keyspace_name, endpoint);
|
||||
for (auto& x: changed_ranges) {
|
||||
auto ep = x.second;
|
||||
if (ss._gossiper.is_alive(ep)) {
|
||||
ss._replicating_nodes.emplace(ep);
|
||||
} else {
|
||||
slogger.warn("Endpoint {} is down and will not receive data for re-replication of {}", ep, endpoint);
|
||||
// Step 3: Start heartbeat updater
|
||||
heartbeat_updater = seastar::async([&ss, &nodes, uuid, heartbeat_updater_done] {
|
||||
slogger.debug("removenode[{}]: Started heartbeat_updater", uuid);
|
||||
while (!(*heartbeat_updater_done)) {
|
||||
auto req = node_ops_cmd_request{node_ops_cmd::removenode_heartbeat, uuid, {}, {}};
|
||||
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got heartbeat response from node={}", uuid, node);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).handle_exception([uuid] (std::exception_ptr ep) {
|
||||
slogger.warn("removenode[{}]: Failed to send heartbeat", uuid);
|
||||
}).get();
|
||||
int nr_seconds = 10;
|
||||
while (!(*heartbeat_updater_done) && nr_seconds--) {
|
||||
sleep_abortable(std::chrono::seconds(1), ss._abort_source).get();
|
||||
}
|
||||
}
|
||||
slogger.debug("removenode[{}]: Stopped heartbeat_updater", uuid);
|
||||
});
|
||||
auto stop_heartbeat_updater = defer([&] {
|
||||
*heartbeat_updater_done = true;
|
||||
heartbeat_updater.get();
|
||||
});
|
||||
|
||||
// Step 4: Start to sync data
|
||||
req.cmd = node_ops_cmd::removenode_sync_data;
|
||||
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got sync_data response from node={}", uuid, node);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
|
||||
|
||||
// Step 5: Announce the node has left
|
||||
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
|
||||
ss.excise(std::move(tmp), endpoint);
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
|
||||
|
||||
// Step 6: Finish
|
||||
req.cmd = node_ops_cmd::removenode_done;
|
||||
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got done response from node={}", uuid, node);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
slogger.info("removenode[{}]: Finished removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
|
||||
} catch (...) {
|
||||
// we need to revert the effect of prepare verb the removenode ops is failed
|
||||
req.cmd = node_ops_cmd::removenode_abort;
|
||||
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
|
||||
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node)) {
|
||||
// No need to revert previous prepare cmd for those who do not apply prepare cmd.
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got abort response from node={}", uuid, node);
|
||||
});
|
||||
}).get();
|
||||
slogger.info("removenode[{}]: Aborted removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
|
||||
throw;
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
|
||||
return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable {
|
||||
return seastar::async([&ss, coordinator, req = std::move(req)] () mutable {
|
||||
auto ops_uuid = req.ops_uuid;
|
||||
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid);
|
||||
if (req.cmd == node_ops_cmd::removenode_prepare) {
|
||||
if (req.leaving_nodes.size() > 1) {
|
||||
auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable {
|
||||
return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
});
|
||||
},
|
||||
[&ss, ops_uuid] () mutable { ss.node_ops_singal_abort(ops_uuid); });
|
||||
ss._node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::removenode_done) {
|
||||
slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::removenode_sync_data) {
|
||||
auto it = ss._node_ops.find(ops_uuid);
|
||||
if (it == ss._node_ops.end()) {
|
||||
throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid));
|
||||
}
|
||||
auto ops = it->second.get_ops_info();
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Started to sync data for removing node={}, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
removenode_with_repair(ss._db, ss._messaging, ss.get_token_metadata_ptr(), node, ops).get();
|
||||
}
|
||||
} else if (req.cmd == node_ops_cmd::removenode_abort) {
|
||||
ss.node_ops_abort(ops_uuid);
|
||||
} else {
|
||||
auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd));
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
slogger.info("removenode: endpoint = {}, replicating_nodes = {}", endpoint, ss._replicating_nodes);
|
||||
ss._removing_node = endpoint;
|
||||
tmptr->add_leaving_endpoint(endpoint);
|
||||
ss.update_pending_ranges(tmptr, format("removenode {}", endpoint)).get();
|
||||
ss.replicate_to_all_cores(std::move(tmptr)).get();
|
||||
tmlock.reset();
|
||||
|
||||
// the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
|
||||
// we add our own token so other nodes to let us know when they're done
|
||||
ss._gossiper.advertise_removing(endpoint, host_id, local_host_id).get();
|
||||
|
||||
// kick off streaming commands
|
||||
// No need to wait for restore_replica_count to complete, since
|
||||
// when it completes, the node will be removed from _replicating_nodes,
|
||||
// and we wait for _replicating_nodes to become empty below
|
||||
//FIXME: discarded future.
|
||||
(void)ss.restore_replica_count(endpoint, my_address).handle_exception([endpoint, my_address] (auto ep) {
|
||||
slogger.info("Failed to restore_replica_count for node {} on node {}", endpoint, my_address);
|
||||
});
|
||||
|
||||
// wait for ReplicationFinishedVerbHandler to signal we're done
|
||||
while (!(ss._replicating_nodes.empty() || ss._force_remove_completion)) {
|
||||
sleep_abortable(std::chrono::milliseconds(100), ss._abort_source).get();
|
||||
}
|
||||
|
||||
if (ss._force_remove_completion) {
|
||||
throw std::runtime_error("nodetool removenode force is called by user");
|
||||
}
|
||||
|
||||
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
|
||||
ss.excise(std::move(tmp), endpoint);
|
||||
|
||||
// gossiper will indicate the token has left
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
|
||||
|
||||
ss._replicating_nodes.clear();
|
||||
ss._removing_node = std::nullopt;
|
||||
node_ops_cmd_response resp;
|
||||
resp.ok = true;
|
||||
return resp;
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2499,7 +2593,9 @@ void storage_service::unbootstrap() {
|
||||
|
||||
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
|
||||
if (is_repair_based_node_ops_enabled()) {
|
||||
return removenode_with_repair(_db, _messaging, get_token_metadata_ptr(), endpoint).finally([this, notify_endpoint] () {
|
||||
auto ops_uuid = utils::make_random_uuid();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::list<gms::inet_address>()});
|
||||
return removenode_with_repair(_db, _messaging, get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
|
||||
return send_replication_notification(notify_endpoint);
|
||||
});
|
||||
}
|
||||
@@ -3214,5 +3310,111 @@ bool storage_service::is_repair_based_node_ops_enabled() {
|
||||
return _db.local().get_config().enable_repair_based_node_ops();
|
||||
}
|
||||
|
||||
node_ops_meta_data::node_ops_meta_data(
|
||||
utils::UUID ops_uuid,
|
||||
gms::inet_address coordinator,
|
||||
shared_ptr<node_ops_info> ops,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func)
|
||||
: _ops_uuid(std::move(ops_uuid))
|
||||
, _coordinator(std::move(coordinator))
|
||||
, _abort(std::move(abort_func))
|
||||
, _signal(std::move(signal_func))
|
||||
, _ops(std::move(ops))
|
||||
, _watchdog([sig = _signal] { sig(); }) {
|
||||
_watchdog.arm(_watchdog_interval);
|
||||
}
|
||||
|
||||
future<> node_ops_meta_data::abort() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
|
||||
_aborted = true;
|
||||
if (_ops) {
|
||||
_ops->abort = true;
|
||||
}
|
||||
_watchdog.cancel();
|
||||
return _abort();
|
||||
}
|
||||
|
||||
void node_ops_meta_data::update_watchdog() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} update_watchdog", _ops_uuid);
|
||||
if (_aborted) {
|
||||
return;
|
||||
}
|
||||
_watchdog.cancel();
|
||||
_watchdog.arm(_watchdog_interval);
|
||||
}
|
||||
|
||||
void node_ops_meta_data::cancel_watchdog() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} cancel_watchdog", _ops_uuid);
|
||||
_watchdog.cancel();
|
||||
}
|
||||
|
||||
shared_ptr<node_ops_info> node_ops_meta_data::get_ops_info() {
|
||||
return _ops;
|
||||
}
|
||||
|
||||
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.update_watchdog();
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.cancel_watchdog();
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.abort().get();
|
||||
abort_repair_node_ops(ops_uuid).get();
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::node_ops_singal_abort(std::optional<utils::UUID> ops_uuid) {
|
||||
slogger.debug("node_ops_singal_abort: ops_uuid={}", ops_uuid);
|
||||
_node_ops_abort_queue.push_back(ops_uuid);
|
||||
_node_ops_abort_cond.signal();
|
||||
}
|
||||
|
||||
future<> storage_service::node_ops_abort_thread() {
|
||||
return seastar::async([this] {
|
||||
slogger.info("Started node_ops_abort_thread");
|
||||
for (;;) {
|
||||
_node_ops_abort_cond.wait([this] { return !_node_ops_abort_queue.empty(); }).get();
|
||||
slogger.debug("Awoke node_ops_abort_thread: node_ops_abort_queue={}", _node_ops_abort_queue);
|
||||
while (!_node_ops_abort_queue.empty()) {
|
||||
auto uuid_opt = _node_ops_abort_queue.front();
|
||||
_node_ops_abort_queue.pop_front();
|
||||
if (!uuid_opt) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
storage_service::node_ops_abort(*uuid_opt);
|
||||
} catch (...) {
|
||||
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
slogger.info("Stopped node_ops_abort_thread");
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
} // namespace service
|
||||
|
||||
|
||||
@@ -63,6 +63,12 @@
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include "sstables/version.hh"
|
||||
#include "cdc/metadata.hh"
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/lowres_clock.hh>
|
||||
|
||||
class node_ops_cmd_request;
|
||||
class node_ops_cmd_response;
|
||||
class node_ops_info;
|
||||
|
||||
namespace cql_transport { class controller; }
|
||||
|
||||
@@ -103,6 +109,28 @@ struct storage_service_config {
|
||||
size_t available_memory;
|
||||
};
|
||||
|
||||
class node_ops_meta_data {
|
||||
utils::UUID _ops_uuid;
|
||||
gms::inet_address _coordinator;
|
||||
std::function<future<> ()> _abort;
|
||||
std::function<void ()> _signal;
|
||||
shared_ptr<node_ops_info> _ops;
|
||||
seastar::timer<lowres_clock> _watchdog;
|
||||
std::chrono::seconds _watchdog_interval{30};
|
||||
bool _aborted = false;
|
||||
public:
|
||||
explicit node_ops_meta_data(
|
||||
utils::UUID ops_uuid,
|
||||
gms::inet_address coordinator,
|
||||
shared_ptr<node_ops_info> ops,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func);
|
||||
shared_ptr<node_ops_info> get_ops_info();
|
||||
future<> abort();
|
||||
void update_watchdog();
|
||||
void cancel_watchdog();
|
||||
};
|
||||
|
||||
/**
|
||||
* This abstraction contains the token/identifier of this node
|
||||
* on the identifier space. This token gets gossiped around.
|
||||
@@ -158,6 +186,17 @@ private:
|
||||
* and would only slow down tests (by having them wait).
|
||||
*/
|
||||
bool _for_testing;
|
||||
|
||||
std::unordered_map<utils::UUID, node_ops_meta_data> _node_ops;
|
||||
std::list<std::optional<utils::UUID>> _node_ops_abort_queue;
|
||||
seastar::condition_variable _node_ops_abort_cond;
|
||||
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
|
||||
future<> _node_ops_abort_thread;
|
||||
void node_ops_update_heartbeat(utils::UUID ops_uuid);
|
||||
void node_ops_done(utils::UUID ops_uuid);
|
||||
void node_ops_abort(utils::UUID ops_uuid);
|
||||
void node_ops_singal_abort(std::optional<utils::UUID> ops_uuid);
|
||||
future<> node_ops_abort_thread();
|
||||
public:
|
||||
storage_service(abort_source& as, distributed<database>& db, gms::gossiper& gossiper, sharded<db::system_distributed_keyspace>&, sharded<db::view::view_update_generator>&, gms::feature_service& feature_service, storage_service_config config, sharded<service::migration_notifier>& mn, locator::shared_token_metadata& stm, sharded<netw::messaging_service>& ms, /* only for tests */ bool for_testing = false);
|
||||
|
||||
@@ -767,7 +806,8 @@ public:
|
||||
*
|
||||
* @param hostIdString token for the node
|
||||
*/
|
||||
future<> removenode(sstring host_id_string);
|
||||
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
|
||||
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
|
||||
|
||||
future<sstring> get_operation_mode();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user