Files
scylladb/api/raft.cc
Emil Maskovsky a9e985fcc9 raft: add the read barrier REST API
This will allow to trigger the read barrier directly via the API,
instead of doing work-arounds (like dropping a non-existent table).

The intended use-case is in the Scylla Manager, to make sure that
the database schema is up to date after the data has been backed up
and before attempting to backup the database schema.

The database schema in particular is being backed up just on a single
node, which might not yet have the schema at least as new as the data
(data can be migrated to a newer schema, but not a vice-versa).

The read barrier issued on the node should ensure that the node should
have the schema at least as new as the data or newer.

Closes #19213
2024-07-08 18:16:27 +02:00

123 lines
4.0 KiB
C++

/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include "api/api-doc/raft.json.hh"
#include "service/raft/raft_group_registry.hh"
#include "log.hh"
using namespace seastar::httpd;
extern logging::logger apilog;
namespace api {
struct http_context;
namespace r = httpd::raft_json;
using namespace json;
namespace {
::service::raft_timeout get_request_timeout(const http::request& req) {
return std::invoke([timeout_str = req.get_query_param("timeout")] {
if (timeout_str.empty()) {
return ::service::raft_timeout{};
}
auto dur = std::stoll(timeout_str);
if (dur <= 0) {
throw bad_param_exception{"Timeout must be a positive number."};
}
return ::service::raft_timeout{.value = lowres_clock::now() + std::chrono::seconds{dur}};
});
}
} // namespace
void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_registry>& raft_gr) {
r::trigger_snapshot.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
raft::group_id gid{utils::UUID{req->get_path_param("group_id")}};
auto timeout = get_request_timeout(*req);
std::atomic<bool> found_srv{false};
co_await raft_gr.invoke_on_all([gid, timeout, &found_srv] (service::raft_group_registry& raft_gr) -> future<> {
if (!raft_gr.find_server(gid)) {
co_return;
}
found_srv = true;
apilog.info("Triggering Raft group {} snapshot", gid);
auto srv = raft_gr.get_server_with_timeouts(gid);
auto result = co_await srv.trigger_snapshot(nullptr, timeout);
if (result) {
apilog.info("New snapshot for Raft group {} created", gid);
} else {
apilog.info("Could not create new snapshot for Raft group {}, no new entries applied", gid);
}
});
if (!found_srv) {
throw bad_param_exception{fmt::format("Server for group ID {} not found", gid)};
}
co_return json_void{};
});
r::get_leader_host.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
return smp::submit_to(0, [&] {
auto& srv = std::invoke([&] () -> raft::server& {
if (req->query_parameters.contains("group_id")) {
raft::group_id id{utils::UUID{req->get_query_param("group_id")}};
return raft_gr.local().get_server(id);
} else {
return raft_gr.local().group0();
}
});
return json_return_type(srv.current_leader().to_sstring());
});
});
r::read_barrier.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
auto timeout = get_request_timeout(*req);
if (!req->query_parameters.contains("group_id")) {
// Read barrier on group 0 by default
co_await raft_gr.invoke_on(0, [timeout] (service::raft_group_registry& raft_gr) {
return raft_gr.group0_with_timeouts().read_barrier(nullptr, timeout);
});
co_return json_void{};
}
raft::group_id gid{utils::UUID{req->get_query_param("group_id")}};
std::atomic<bool> found_srv{false};
co_await raft_gr.invoke_on_all([gid, timeout, &found_srv] (service::raft_group_registry& raft_gr) {
if (!raft_gr.find_server(gid)) {
return make_ready_future<>();
}
found_srv = true;
return raft_gr.get_server_with_timeouts(gid).read_barrier(nullptr, timeout);
});
if (!found_srv) {
throw bad_param_exception{fmt::format("Server for group ID {} not found", gid)};
}
co_return json_void{};
});
}
void unset_raft(http_context&, httpd::routes& r) {
r::trigger_snapshot.unset(r);
r::get_leader_host.unset(r);
r::read_barrier.unset(r);
}
}