|
|
|
|
@@ -1032,7 +1032,7 @@ public:
|
|
|
|
|
|
|
|
|
|
// }}} raft_ip_address_updater
|
|
|
|
|
|
|
|
|
|
future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<service::storage_proxy>& proxy) noexcept {
|
|
|
|
|
future<> storage_service::sstable_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded<service::storage_proxy>& proxy) noexcept {
|
|
|
|
|
while (!_group0_as.abort_requested()) {
|
|
|
|
|
bool err = false;
|
|
|
|
|
try {
|
|
|
|
|
@@ -1134,7 +1134,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
|
|
|
|
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::holder group0_holder, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
|
|
|
|
std::optional<abort_source> as;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
@@ -1867,9 +1867,9 @@ future<> storage_service::join_topology(sharded<db::system_distributed_keyspace>
|
|
|
|
|
co_await raft_initialize_discovery_leader(join_params);
|
|
|
|
|
|
|
|
|
|
// start topology coordinator fiber
|
|
|
|
|
_raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks);
|
|
|
|
|
_raft_state_monitor = raft_state_monitor_fiber(*raft_server, _group0->hold_group0_gate(), sys_dist_ks);
|
|
|
|
|
// start cleanup fiber
|
|
|
|
|
_sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, proxy);
|
|
|
|
|
_sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, _group0->hold_group0_gate(), proxy);
|
|
|
|
|
|
|
|
|
|
// Need to start system_distributed_keyspace before bootstrap because bootstrapping
|
|
|
|
|
// process may access those tables.
|
|
|
|
|
@@ -2150,7 +2150,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
|
|
|
|
|
// Start the topology coordinator monitor fiber. If we are the leader, this will start
|
|
|
|
|
// the topology coordinator which is responsible for driving the upgrade process.
|
|
|
|
|
try {
|
|
|
|
|
_raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), sys_dist_ks);
|
|
|
|
|
_raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), _group0->hold_group0_gate(), sys_dist_ks);
|
|
|
|
|
} catch (...) {
|
|
|
|
|
// The calls above can theoretically fail due to coroutine frame allocation failure.
|
|
|
|
|
// Abort in this case as the node should be in a pretty bad shape anyway.
|
|
|
|
|
@@ -2176,7 +2176,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
_sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), proxy);
|
|
|
|
|
_sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), _group0->hold_group0_gate(), proxy);
|
|
|
|
|
start_tablet_split_monitor();
|
|
|
|
|
} catch (...) {
|
|
|
|
|
rtlogger.error("failed to start one of the raft-related background fibers: {}", std::current_exception());
|
|
|
|
|
@@ -3649,6 +3649,7 @@ static size_t count_normal_token_owners(const topology& topology) {
|
|
|
|
|
|
|
|
|
|
future<> storage_service::raft_decommission() {
|
|
|
|
|
auto& raft_server = _group0->group0_server();
|
|
|
|
|
auto holder = _group0->hold_group0_gate();
|
|
|
|
|
utils::UUID request_id;
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
@@ -4650,6 +4651,7 @@ future<> storage_service::do_drain() {
|
|
|
|
|
|
|
|
|
|
future<> storage_service::do_cluster_cleanup() {
|
|
|
|
|
auto& raft_server = _group0->group0_server();
|
|
|
|
|
auto holder = _group0->hold_group0_gate();
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
|
|
|
|
@@ -4720,6 +4722,7 @@ future<> storage_service::wait_for_topology_not_busy() {
|
|
|
|
|
|
|
|
|
|
future<> storage_service::raft_rebuild(utils::optional_param sdc_param) {
|
|
|
|
|
auto& raft_server = _group0->group0_server();
|
|
|
|
|
auto holder = _group0->hold_group0_gate();
|
|
|
|
|
utils::UUID request_id;
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
@@ -5474,12 +5477,15 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
auto& raft_server = _group0->group0_server();
|
|
|
|
|
auto group0_holder = _group0->hold_group0_gate();
|
|
|
|
|
// do barrier to make sure we always see the latest topology
|
|
|
|
|
co_await raft_server.read_barrier(&_group0_as);
|
|
|
|
|
if (raft_server.get_current_term() != term) {
|
|
|
|
|
// Return an error since the command is from outdated leader
|
|
|
|
|
co_return result;
|
|
|
|
|
}
|
|
|
|
|
auto id = raft_server.id();
|
|
|
|
|
group0_holder.release();
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
auto& state = _raft_topology_cmd_handler_state;
|
|
|
|
|
@@ -5591,7 +5597,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
|
|
|
|
break;
|
|
|
|
|
case raft_topology_cmd::command::stream_ranges: {
|
|
|
|
|
co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> {
|
|
|
|
|
const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second;
|
|
|
|
|
const auto& rs = _topology_state_machine._topology.find(id)->second;
|
|
|
|
|
auto tstate = _topology_state_machine._topology.tstate;
|
|
|
|
|
if (!rs.ring || rs.ring->tokens.empty()) {
|
|
|
|
|
rtlogger.warn("got {} request but the node does not own any tokens and is in the {} state", cmd.cmd, rs.state);
|
|
|
|
|
@@ -5640,11 +5646,11 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
|
|
|
|
utils::get_local_injector().inject("stop_after_streaming",
|
|
|
|
|
[] { std::raise(SIGSTOP); });
|
|
|
|
|
} else {
|
|
|
|
|
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id;
|
|
|
|
|
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
|
|
|
|
|
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
|
|
|
|
|
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &raft_server, replaced_id] () -> future<> {
|
|
|
|
|
if (!_topology_state_machine._topology.req_param.contains(raft_server.id())) {
|
|
|
|
|
on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", raft_server.id()));
|
|
|
|
|
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &id, replaced_id] () -> future<> {
|
|
|
|
|
if (!_topology_state_machine._topology.req_param.contains(id)) {
|
|
|
|
|
on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id));
|
|
|
|
|
}
|
|
|
|
|
if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) {
|
|
|
|
|
auto ignored_nodes = boost::copy_range<std::unordered_set<locator::host_id>>(_topology_state_machine._topology.ignored_nodes | boost::adaptors::transformed([] (const auto& id) {
|
|
|
|
|
@@ -5728,7 +5734,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
case node_state::rebuilding: {
|
|
|
|
|
auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[raft_server.id()]).source_dc;
|
|
|
|
|
auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[id]).source_dc;
|
|
|
|
|
rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
|
|
|
|
|
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
|
|
|
|
|
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
|
|
|
|
|
@@ -5775,7 +5781,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
|
|
|
|
case node_state::none:
|
|
|
|
|
case node_state::removing:
|
|
|
|
|
on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster",
|
|
|
|
|
raft_server.id(), rs.state));
|
|
|
|
|
id, rs.state));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
@@ -6491,6 +6497,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
auto& g0_server = _group0->group0_server();
|
|
|
|
|
auto g0_holder = _group0->hold_group0_gate();
|
|
|
|
|
if (params.replaced_id && *params.replaced_id == g0_server.current_leader()) {
|
|
|
|
|
// There is a peculiar case that can happen if the leader is killed
|
|
|
|
|
// and then replaced very quickly:
|
|
|
|
|
|