diff --git a/CMakeLists.txt b/CMakeLists.txt index 02d663aa18..1c18999b9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -138,6 +138,7 @@ target_sources(scylla-main keys.cc multishard_mutation_query.cc mutation_query.cc + node_ops/task_manager_module.cc partition_slice_builder.cc querier.cc query.cc diff --git a/configure.py b/configure.py index d64c5fc5cc..4c2fd86910 100755 --- a/configure.py +++ b/configure.py @@ -1180,7 +1180,8 @@ scylla_core = (['message/messaging_service.cc', 'service/topology_state_machine.cc', 'service/topology_mutation.cc', 'service/topology_coordinator.cc', - 'node_ops/node_ops_ctl.cc' + 'node_ops/node_ops_ctl.cc', + 'node_ops/task_manager_module.cc', ] + [Antlr3Grammar('cql3/Cql.g')] \ + scylla_raft_core ) diff --git a/node_ops/task_manager_module.cc b/node_ops/task_manager_module.cc new file mode 100644 index 0000000000..90fd1ab737 --- /dev/null +++ b/node_ops/task_manager_module.cc @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "node_ops/task_manager_module.hh" +#include "service/storage_service.hh" +#include "tasks/task_handler.hh" + +namespace node_ops { + +tasks::task_manager::task_group node_ops_virtual_task::get_group() const noexcept { + return tasks::task_manager::task_group::topology_change_group; +} + +future> node_ops_virtual_task::get_ids() const { + return make_ready_future>(std::set{}); +} + +future node_ops_virtual_task::is_abortable() const { + return make_ready_future(tasks::is_abortable::no); +} + +future> node_ops_virtual_task::get_status(tasks::task_id id) { + return make_ready_future>(std::nullopt); +} + +future> node_ops_virtual_task::wait(tasks::task_id id) { + return make_ready_future>(std::nullopt); +} + +future<> node_ops_virtual_task::abort(tasks::task_id id) noexcept { + return make_ready_future(); +} + +future> node_ops_virtual_task::get_stats() { + return make_ready_future>(std::vector{}); +} + +task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept + : tasks::task_manager::module(tm, "node_ops") + , _ss(ss) +{} + +std::set task_manager_module::get_nodes() const noexcept { + return boost::copy_range>( + boost::join( + _ss._topology_state_machine._topology.normal_nodes, + _ss._topology_state_machine._topology.transition_nodes + ) | boost::adaptors::transformed([&ss = _ss] (auto& node) { + return ss.host2ip(locator::host_id{node.first.uuid()}); + }) + ); +} + +} diff --git a/node_ops/task_manager_module.hh b/node_ops/task_manager_module.hh new file mode 100644 index 0000000000..7e7ea421c0 --- /dev/null +++ b/node_ops/task_manager_module.hh @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "streaming/stream_reason.hh" +#include "tasks/task_manager.hh" + +namespace service { +class storage_service; +} + +namespace node_ops { + +class node_ops_virtual_task : public tasks::task_manager::virtual_task::impl { +private: + service::storage_service& _ss; +public: + node_ops_virtual_task(tasks::task_manager::module_ptr module, + service::storage_service& ss) + : tasks::task_manager::virtual_task::impl(std::move(module)) + , _ss(ss) + {} + virtual tasks::task_manager::task_group get_group() const noexcept override; + virtual future> get_ids() const override; + virtual future is_abortable() const override; + + virtual future> get_status(tasks::task_id id) override; + virtual future> wait(tasks::task_id id) override; + virtual future<> abort(tasks::task_id id) noexcept override; + virtual future> get_stats() override; +}; + +class task_manager_module : public tasks::task_manager::module { +private: + service::storage_service& _ss; +public: + task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept; + + virtual std::set get_nodes() const noexcept override; +}; + +} diff --git a/service/storage_service.hh b/service/storage_service.hh index e261511c96..d9ffef24cc 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -90,6 +90,10 @@ class gossiper; class loaded_endpoint_state; }; +namespace node_ops { +class task_manager_module; +} + namespace service { class storage_service; @@ -955,6 +959,7 @@ private: abort_source _group0_as; friend class join_node_rpc_handshaker; + friend class node_ops::task_manager_module; }; } diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index a33702f9d3..4b42ece78d 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -104,6 +104,7 @@ public: enum class task_group { // Each virtual task needs to have its group. + topology_change_group, }; class task : public enable_lw_shared_from_this {