storage_service: fall back to local cleanup in cleanup_all
before this change, if no keyspaces are specified, scylla-nodetool just enumerate all non-local keyspaces, and call "/storage_service/keyspace_cleanup" on them one after another. this is not quite efficient, as each this RESTful API call force a new active commitlog segment, and flushes all tables. so, if the target node of this command has N non-local keyspaces, it would repeat the steps above for N times. this is not necessary. and after a topology change, we would like to run a global "nodetool cleanup" without specifying the keyspace, so this is a typical use case which we do care about. to address this performance issue, in this change, we improve an existing RESTful API call "/storage_service/cleanup_all", so if the topology coordinator is not enabled, we fall back to a local cleanup to cleanup all non-local keyspaces. Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
@@ -802,7 +802,7 @@
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Trigger a global cluster cleanup through the topology coordinator",
|
||||
"summary":"Trigger a global cleanup",
|
||||
"type":"long",
|
||||
"nickname":"cleanup_all",
|
||||
"produces":[
|
||||
|
||||
@@ -798,13 +798,28 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
co_return json::json_return_type(0);
|
||||
});
|
||||
|
||||
ss::cleanup_all.set(r, [&ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
co_await ss.invoke_on(0, [] (service::storage_service& ss) {
|
||||
ss::cleanup_all.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
apilog.info("cleanup_all");
|
||||
auto done = co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
|
||||
if (!ss.is_topology_coordinator_enabled()) {
|
||||
throw std::runtime_error("Cannot run cleanup through the topology coordinator because it is disabled");
|
||||
co_return false;
|
||||
}
|
||||
return ss.do_cluster_cleanup();
|
||||
co_await ss.do_cluster_cleanup();
|
||||
co_return true;
|
||||
});
|
||||
if (done) {
|
||||
co_return json::json_return_type(0);
|
||||
}
|
||||
// fall back to the local global cleanup if topology coordinator is not enabled
|
||||
auto& db = ctx.db;
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<global_cleanup_compaction_task_impl>({}, db);
|
||||
try {
|
||||
co_await task->done();
|
||||
} catch (...) {
|
||||
apilog.error("cleanup_all failed: {}", std::current_exception());
|
||||
throw;
|
||||
}
|
||||
co_return json::json_return_type(0);
|
||||
});
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
#include <boost/range/algorithm/min_element.hpp>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
|
||||
#include "compaction/task_manager_module.hh"
|
||||
#include "compaction/compaction_manager.hh"
|
||||
@@ -423,6 +424,35 @@ future<> cleanup_keyspace_compaction_task_impl::run() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> global_cleanup_compaction_task_impl::run() {
|
||||
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
co_await db.flush_all_tables();
|
||||
const auto keyspaces = _db.local().get_non_local_strategy_keyspaces();
|
||||
co_await coroutine::parallel_for_each(keyspaces, [&] (const sstring& ks) -> future<> {
|
||||
const auto& keyspace = db.find_keyspace(ks);
|
||||
const auto& replication_strategy = keyspace.get_replication_strategy();
|
||||
if (replication_strategy.get_type() == locator::replication_strategy_type::local) {
|
||||
// this keyspace does not require cleanup
|
||||
co_return;
|
||||
}
|
||||
if (replication_strategy.uses_tablets()) {
|
||||
// this keyspace does not support cleanup
|
||||
co_return;
|
||||
}
|
||||
std::vector<table_info> tables;
|
||||
const auto& cf_meta_data = db.find_keyspace(ks).metadata().get()->cf_meta_data();
|
||||
for (auto& [name, schema] : cf_meta_data) {
|
||||
tables.emplace_back(name, schema->id());
|
||||
}
|
||||
auto& module = db.get_compaction_manager().get_task_manager_module();
|
||||
const tasks::task_info task_info{_status.id, _status.shard};
|
||||
auto task = co_await module.make_and_start_task<shard_cleanup_keyspace_compaction_task_impl>(
|
||||
task_info, _status.keyspace, _status.id, db, std::move(tables));
|
||||
co_await task->done();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> shard_cleanup_keyspace_compaction_task_impl::run() {
|
||||
seastar::condition_variable cv;
|
||||
tasks::task_manager::task_ptr current_task;
|
||||
|
||||
@@ -214,6 +214,22 @@ protected:
|
||||
virtual future<> run() override;
|
||||
};
|
||||
|
||||
class global_cleanup_compaction_task_impl : public compaction_task_impl {
|
||||
private:
|
||||
sharded<replica::database>& _db;
|
||||
public:
|
||||
global_cleanup_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
sharded<replica::database>& db) noexcept
|
||||
: compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "global", "", "", "", tasks::task_id::create_null_id())
|
||||
, _db(db)
|
||||
{}
|
||||
std::string type() const final {
|
||||
return "global cleanup compaction";
|
||||
}
|
||||
private:
|
||||
future<> run() final;
|
||||
};
|
||||
|
||||
class shard_cleanup_keyspace_compaction_task_impl : public cleanup_compaction_task_impl {
|
||||
private:
|
||||
replica::database& _db;
|
||||
|
||||
@@ -173,15 +173,7 @@ void cleanup_operation(scylla_rest_client& client, const bpo::variables_map& vm)
|
||||
}
|
||||
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace), std::move(params));
|
||||
} else {
|
||||
auto res = client.post(format("/storage_service/cleanup_all"));
|
||||
if (res.IsObject()) {
|
||||
// If previous post returns an object instead of a simple value it means the request
|
||||
// failed either because server does not support it yet or topology coordinator is disabled.
|
||||
// Fall back to the old cleanup API.
|
||||
for (const auto& keyspace : get_keyspaces(client, "non_local_strategy")) {
|
||||
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace));
|
||||
}
|
||||
}
|
||||
client.post("/storage_service/cleanup_all");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user