storage_service: fall back to local cleanup in cleanup_all

before this change, if no keyspaces are specified,
scylla-nodetool just enumerate all non-local keyspaces, and
call "/storage_service/keyspace_cleanup" on them one after another.
this is not quite efficient, as each this RESTful API call
force a new active commitlog segment, and flushes all tables.
so, if the target node of this command has N non-local keyspaces,
it would repeat the steps above for N times. this is not necessary.
and after a topology change, we would like to run a global
"nodetool cleanup" without specifying the keyspace, so this
is a typical use case which we do care about.

to address this performance issue, in this change, we improve
an existing RESTful API call "/storage_service/cleanup_all", so
if the topology coordinator is not enabled, we fall back to
a local cleanup to cleanup all non-local keyspaces.

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
Kefu Chai
2024-01-19 16:30:51 +08:00
parent 4f90a875f6
commit 5e0b3671d3
5 changed files with 67 additions and 14 deletions

View File

@@ -802,7 +802,7 @@
"operations":[
{
"method":"POST",
"summary":"Trigger a global cluster cleanup through the topology coordinator",
"summary":"Trigger a global cleanup",
"type":"long",
"nickname":"cleanup_all",
"produces":[

View File

@@ -798,13 +798,28 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
co_return json::json_return_type(0);
});
ss::cleanup_all.set(r, [&ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
co_await ss.invoke_on(0, [] (service::storage_service& ss) {
ss::cleanup_all.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
apilog.info("cleanup_all");
auto done = co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
if (!ss.is_topology_coordinator_enabled()) {
throw std::runtime_error("Cannot run cleanup through the topology coordinator because it is disabled");
co_return false;
}
return ss.do_cluster_cleanup();
co_await ss.do_cluster_cleanup();
co_return true;
});
if (done) {
co_return json::json_return_type(0);
}
// fall back to the local global cleanup if topology coordinator is not enabled
auto& db = ctx.db;
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<global_cleanup_compaction_task_impl>({}, db);
try {
co_await task->done();
} catch (...) {
apilog.error("cleanup_all failed: {}", std::current_exception());
throw;
}
co_return json::json_return_type(0);
});

View File

@@ -7,6 +7,7 @@
*/
#include <boost/range/algorithm/min_element.hpp>
#include <seastar/coroutine/parallel_for_each.hh>
#include "compaction/task_manager_module.hh"
#include "compaction/compaction_manager.hh"
@@ -423,6 +424,35 @@ future<> cleanup_keyspace_compaction_task_impl::run() {
});
}
future<> global_cleanup_compaction_task_impl::run() {
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
co_await db.flush_all_tables();
const auto keyspaces = _db.local().get_non_local_strategy_keyspaces();
co_await coroutine::parallel_for_each(keyspaces, [&] (const sstring& ks) -> future<> {
const auto& keyspace = db.find_keyspace(ks);
const auto& replication_strategy = keyspace.get_replication_strategy();
if (replication_strategy.get_type() == locator::replication_strategy_type::local) {
// this keyspace does not require cleanup
co_return;
}
if (replication_strategy.uses_tablets()) {
// this keyspace does not support cleanup
co_return;
}
std::vector<table_info> tables;
const auto& cf_meta_data = db.find_keyspace(ks).metadata().get()->cf_meta_data();
for (auto& [name, schema] : cf_meta_data) {
tables.emplace_back(name, schema->id());
}
auto& module = db.get_compaction_manager().get_task_manager_module();
const tasks::task_info task_info{_status.id, _status.shard};
auto task = co_await module.make_and_start_task<shard_cleanup_keyspace_compaction_task_impl>(
task_info, _status.keyspace, _status.id, db, std::move(tables));
co_await task->done();
});
});
}
future<> shard_cleanup_keyspace_compaction_task_impl::run() {
seastar::condition_variable cv;
tasks::task_manager::task_ptr current_task;

View File

@@ -214,6 +214,22 @@ protected:
virtual future<> run() override;
};
class global_cleanup_compaction_task_impl : public compaction_task_impl {
private:
sharded<replica::database>& _db;
public:
global_cleanup_compaction_task_impl(tasks::task_manager::module_ptr module,
sharded<replica::database>& db) noexcept
: compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "global", "", "", "", tasks::task_id::create_null_id())
, _db(db)
{}
std::string type() const final {
return "global cleanup compaction";
}
private:
future<> run() final;
};
class shard_cleanup_keyspace_compaction_task_impl : public cleanup_compaction_task_impl {
private:
replica::database& _db;

View File

@@ -173,15 +173,7 @@ void cleanup_operation(scylla_rest_client& client, const bpo::variables_map& vm)
}
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace), std::move(params));
} else {
auto res = client.post(format("/storage_service/cleanup_all"));
if (res.IsObject()) {
// If previous post returns an object instead of a simple value it means the request
// failed either because server does not support it yet or topology coordinator is disabled.
// Fall back to the old cleanup API.
for (const auto& keyspace : get_keyspaces(client, "non_local_strategy")) {
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace));
}
}
client.post("/storage_service/cleanup_all");
}
}