storage_service: topology coordinator: introduce sstable cleanup fiber

Introduce a fiber that waits on a topology event and when it sees that
the node it runs on needs to perform sstable cleanup it initiates one
for each non tablet, non local table and resets "cleanup" flag back to
"clean" in the topology.
This commit is contained in:
Gleb Natapov
2023-10-25 18:34:07 +03:00
parent 5b246920ae
commit f70c4127c6
2 changed files with 105 additions and 1 deletions

View File

@@ -22,6 +22,7 @@
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/consistency_level.hh"
#include "seastar/core/when_all.hh"
#include "service/tablet_allocator.hh"
#include "locator/tablets.hh"
#include "locator/tablet_metadata_guard.hh"
@@ -1042,6 +1043,104 @@ topology_node_mutation_builder& topology_mutation_builder::with_node(raft::serve
return *_node_builder;
}
future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<service::storage_proxy>& proxy) noexcept {
while (!_group0_as.abort_requested()) {
bool err = false;
try {
co_await _topology_state_machine.event.when([&] {
auto me = _topology_state_machine._topology.find(server.id());
return me && me->second.cleanup == cleanup_status::running;
});
std::vector<future<>> tasks;
auto do_cleanup_ks = [this, &proxy] (sstring ks_name, std::vector<table_info> table_infos) -> future<> {
// Wait for all local writes to complete before cleanup
co_await proxy.invoke_on_all([] (storage_proxy& sp) -> future<> {
co_return co_await sp.await_pending_writes();
});
auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<cleanup_keyspace_compaction_task_impl>({}, ks_name, _db, table_infos);
try {
co_return co_await task->done();
} catch (...) {
slogger.error("raft topology: cleanup failed keyspace={} tables={} failed: {}", task->get_status().keyspace, table_infos, std::current_exception());
throw;
}
};
{
// The scope for the guard
auto guard = co_await _group0->client().start_operation(&_group0_as);
auto me = _topology_state_machine._topology.find(server.id());
// Recheck that cleanup is needed after the barrier
if (!me || me->second.cleanup != cleanup_status::running) {
slogger.trace("raft topology: cleanup triggered, but not needed");
continue;
}
slogger.info("raft topology: start cleanup");
auto keyspaces = _db.local().get_all_keyspaces();
tasks.reserve(keyspaces.size());
co_await coroutine::parallel_for_each(keyspaces.begin(), keyspaces.end(), [this, &tasks, &do_cleanup_ks] (const sstring& ks_name) -> future<> {
auto ks = _db.local().find_keyspace(ks_name);
if (ks.get_replication_strategy().is_per_table() || is_system_keyspace(ks_name)) {
// Skip tablets tables since they do their own cleanup and system tables
// since they are local and not affected by range movements.
co_return;
}
const auto& cf_meta_data = ks.metadata().get()->cf_meta_data();
std::vector<table_info> table_infos;
table_infos.reserve(cf_meta_data.size());
for (const auto& [name, schema] : cf_meta_data) {
table_infos.emplace_back(table_info{name, schema->id()});
}
tasks.push_back(do_cleanup_ks(std::move(ks_name), std::move(table_infos)));
});
}
// Note that the guard is released while we are waiting for cleanup tasks to complete
co_await when_all_succeed(tasks.begin(), tasks.end()).discard_result();
slogger.info("raft topology: cleanup ended");
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup completed for {}", server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
} catch (group0_concurrent_modification&) {
slogger.info("raft topology: cleanup flag clearing: concurrent operation is detected, retrying.");
continue;
}
break;
}
slogger.debug("raft topology: cleanup flag cleared");
} catch (const seastar::abort_requested_exception &) {
slogger.info("raft topology: cleanup fiber aborted");
break;
} catch (raft::request_aborted&) {
slogger.info("raft topology: cleanup fiber aborted");
break;
} catch (...) {
slogger.error("raft topology: cleanup fiber got an error: {}", std::current_exception());
err = true;
}
if (err) {
co_await sleep_abortable(std::chrono::seconds(1), _group0_as);
}
}
}
using raft_topology_cmd_handler_type = noncopyable_function<future<raft_topology_cmd_result>(
raft::term_t, uint64_t, const raft_topology_cmd&)>;
@@ -3519,6 +3618,8 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
// start topology coordinator fiber
_raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks);
// start cleanup fiber
_sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, proxy);
// Need to start system_distributed_keyspace before bootstrap because bootstrapping
// process may access those tables.
@@ -4723,7 +4824,7 @@ future<> storage_service::stop() {
future<> storage_service::wait_for_group0_stop() {
_group0_as.request_abort();
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
co_await std::move(_raft_state_monitor);
co_await when_all(std::move(_raft_state_monitor), std::move(_sstable_cleanup_fiber));
}
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features) {

View File

@@ -805,6 +805,9 @@ private:
shared_promise<> _join_node_response_done;
semaphore _join_node_response_handler_mutex{1};
future<> _sstable_cleanup_fiber = make_ready_future<>();
future<> sstable_cleanup_fiber(raft::server& raft, sharded<service::storage_proxy>& proxy) noexcept;
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
abort_source _group0_as;