Files
scylladb/service/raft/group0_state_id_handler.cc
Patryk Jędrzejczak 6b2e003994 group0_state_id_handler: remove unused group0_server_accessor
It became unused in the previous commit.
2025-10-20 12:05:11 +02:00

143 lines
5.8 KiB
C++

/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "service/raft/group0_state_id_handler.hh"
#include "db/config.hh"
#include "compaction/compaction_manager.hh"
#include "gms/gossiper.hh"
#include "utils/log.hh"
#include "raft/server.hh"
#include "replica/database.hh"
#include "service/raft/raft_group_registry.hh"
namespace service {
static logging::logger slogger("group0_tombstone_gc_handler");
lowres_clock::duration group0_state_id_handler::get_refresh_interval(const replica::database& db) {
return std::chrono::milliseconds{db.get_config().group0_tombstone_gc_refresh_interval_in_ms()};
}
void group0_state_id_handler::refresh() {
// It's not enough to consider only the current group 0 members. In the Raft-based recovery procedure, there can be
// nodes that haven't joined the current group 0 yet but they have belonged to a different group 0 and thus have
// a non-empty group 0 state ID.
//
// Ignored nodes are permanently banned, so we can safely filter them out even if they belong to the group 0.
const auto& group0_members = std::ranges::join_view(std::to_array({
std::views::all(_topo_sm._topology.normal_nodes),
std::views::all(_topo_sm._topology.transition_nodes)
})) | std::views::keys | std::ranges::views::filter([this] (const raft::server_id& id) {
return !_topo_sm._topology.ignored_nodes.contains(id);
}) | std::ranges::to<std::vector>();
if (group0_members.empty()) {
slogger.info("Skipping due to empty group0");
return;
}
slogger.trace("Current node count: {}", group0_members.size());
std::vector<raft::server_id> group0_members_missing_endpoint;
std::vector<raft::server_id> group0_members_missing_state_id;
const auto& group0_members_state_ids = group0_members | std::ranges::views::transform([&](const auto& id) -> std::optional<utils::UUID> {
const auto* state_id_ptr = _gossiper.get_application_state_ptr(locator::host_id{id.uuid()}, gms::application_state::GROUP0_STATE_ID);
if (!state_id_ptr) {
group0_members_missing_state_id.push_back(id);
return std::nullopt;
}
return utils::UUID{state_id_ptr->value()};
}) | std::ranges::views::filter([](const auto& state_id) {
return state_id.has_value();
}) | std::ranges::views::transform([](const auto& state_id) {
return state_id.value();
}) | std::ranges::to<std::vector>();
if (!group0_members_missing_endpoint.empty()) {
slogger.info("Skipping due to missing endpoints for members: {}", fmt::join(group0_members_missing_endpoint, ", "));
return;
}
if (!group0_members_missing_state_id.empty()) {
slogger.info("Skipping due to missing state id of some endpoints: {}", fmt::join(group0_members_missing_state_id, ", "));
return;
}
const auto min_state_id = std::ranges::min(group0_members_state_ids, [](auto a, auto b) {
if (!a || !b) {
// This should never happen, but if it does, it's a bug.
on_fatal_internal_error(slogger, "unexpected empty state_id");
}
return utils::timeuuid_tri_compare(a, b) < 0;
});
if (_state_id_last_reconcile && utils::timeuuid_tri_compare(_state_id_last_reconcile, min_state_id) > 0) {
slogger.info("Skipping due to the stale minimum state id: {}", min_state_id);
return;
}
_state_id_last_reconcile = min_state_id;
const auto tombstone_gc_time = to_gc_clock(db_clock::time_point(utils::UUID_gen::unix_timestamp(min_state_id)))
// subtracting one resolution unit:
// - the GC time has 1s resolution
// - there might be state_ids with the same GC time that didn't propagate yet
// (and thus we don't want to GC these)
- gc_clock::duration{1};
slogger.info("Setting reconcile time to {} (min id={})", tombstone_gc_time, min_state_id);
auto& gc_state = _local_db.get_compaction_manager().get_shared_tombstone_gc_state();
gc_state.update_group0_refresh_time(tombstone_gc_time);
}
group0_state_id_handler::group0_state_id_handler(topology_state_machine& topo_sm, replica::database& local_db, gms::gossiper& gossiper)
: _topo_sm(topo_sm)
, _local_db(local_db)
, _gossiper(gossiper)
, _refresh_interval(get_refresh_interval(local_db)) {
}
void group0_state_id_handler::run() {
if (this_shard_id() != 0) {
on_fatal_internal_error(slogger, "group0_state_id_handler must be started on shard 0");
}
_timer.set_callback([this] {
refresh();
_timer.arm(_refresh_interval);
});
// do the first refresh "right away" (i.e., in 2 gossip heartbeats, when we should have someone to talk to);
// after that send every _refresh_interval.
_timer.arm(2 * gms::gossiper::INTERVAL);
}
future<> group0_state_id_handler::advertise_state_id(utils::UUID state_id) {
if (!_gossiper.is_enabled()) {
slogger.debug("Skipping advertisement of state id {} because gossiper is not active", state_id);
return make_ready_future();
}
if (_state_id_last_advertised && utils::timeuuid_tri_compare(_state_id_last_advertised, state_id) > 0) {
slogger.debug("Skipping advertisement of stale state id {}", state_id);
return make_ready_future();
}
_state_id_last_advertised = state_id;
const auto gc_time = to_gc_clock(db_clock::time_point(utils::UUID_gen::unix_timestamp(state_id)));
slogger.debug("Advertising state id: {} (gc_time: {})", state_id, gc_time);
return _gossiper.add_local_application_state(gms::application_state::GROUP0_STATE_ID, gms::versioned_value::state_id(service::state_id(state_id)));
}
} // namespace service