mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
node_ops: add task manager module and node_ops_virtual_task
Add task manager node ops module and node_ops_virtual_task. Some methods will be implemented in later patches.
This commit is contained in:
@@ -138,6 +138,7 @@ target_sources(scylla-main
|
||||
keys.cc
|
||||
multishard_mutation_query.cc
|
||||
mutation_query.cc
|
||||
node_ops/task_manager_module.cc
|
||||
partition_slice_builder.cc
|
||||
querier.cc
|
||||
query.cc
|
||||
|
||||
@@ -1180,7 +1180,8 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'service/topology_state_machine.cc',
|
||||
'service/topology_mutation.cc',
|
||||
'service/topology_coordinator.cc',
|
||||
'node_ops/node_ops_ctl.cc'
|
||||
'node_ops/node_ops_ctl.cc',
|
||||
'node_ops/task_manager_module.cc',
|
||||
] + [Antlr3Grammar('cql3/Cql.g')] \
|
||||
+ scylla_raft_core
|
||||
)
|
||||
|
||||
59
node_ops/task_manager_module.cc
Normal file
59
node_ops/task_manager_module.cc
Normal file
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include "node_ops/task_manager_module.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "tasks/task_handler.hh"
|
||||
|
||||
namespace node_ops {
|
||||
|
||||
tasks::task_manager::task_group node_ops_virtual_task::get_group() const noexcept {
|
||||
return tasks::task_manager::task_group::topology_change_group;
|
||||
}
|
||||
|
||||
future<std::set<tasks::task_id>> node_ops_virtual_task::get_ids() const {
|
||||
return make_ready_future<std::set<tasks::task_id>>(std::set<tasks::task_id>{});
|
||||
}
|
||||
|
||||
future<tasks::is_abortable> node_ops_virtual_task::is_abortable() const {
|
||||
return make_ready_future<tasks::is_abortable>(tasks::is_abortable::no);
|
||||
}
|
||||
|
||||
future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status(tasks::task_id id) {
|
||||
return make_ready_future<std::optional<tasks::task_status>>(std::nullopt);
|
||||
}
|
||||
|
||||
future<std::optional<tasks::task_status>> node_ops_virtual_task::wait(tasks::task_id id) {
|
||||
return make_ready_future<std::optional<tasks::task_status>>(std::nullopt);
|
||||
}
|
||||
|
||||
future<> node_ops_virtual_task::abort(tasks::task_id id) noexcept {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<std::vector<tasks::task_stats>> node_ops_virtual_task::get_stats() {
|
||||
return make_ready_future<std::vector<tasks::task_stats>>(std::vector<tasks::task_stats>{});
|
||||
}
|
||||
|
||||
task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept
|
||||
: tasks::task_manager::module(tm, "node_ops")
|
||||
, _ss(ss)
|
||||
{}
|
||||
|
||||
std::set<gms::inet_address> task_manager_module::get_nodes() const noexcept {
|
||||
return boost::copy_range<std::set<gms::inet_address>>(
|
||||
boost::join(
|
||||
_ss._topology_state_machine._topology.normal_nodes,
|
||||
_ss._topology_state_machine._topology.transition_nodes
|
||||
) | boost::adaptors::transformed([&ss = _ss] (auto& node) {
|
||||
return ss.host2ip(locator::host_id{node.first.uuid()});
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
48
node_ops/task_manager_module.hh
Normal file
48
node_ops/task_manager_module.hh
Normal file
@@ -0,0 +1,48 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "streaming/stream_reason.hh"
|
||||
#include "tasks/task_manager.hh"
|
||||
|
||||
namespace service {
|
||||
class storage_service;
|
||||
}
|
||||
|
||||
namespace node_ops {
|
||||
|
||||
class node_ops_virtual_task : public tasks::task_manager::virtual_task::impl {
|
||||
private:
|
||||
service::storage_service& _ss;
|
||||
public:
|
||||
node_ops_virtual_task(tasks::task_manager::module_ptr module,
|
||||
service::storage_service& ss)
|
||||
: tasks::task_manager::virtual_task::impl(std::move(module))
|
||||
, _ss(ss)
|
||||
{}
|
||||
virtual tasks::task_manager::task_group get_group() const noexcept override;
|
||||
virtual future<std::set<tasks::task_id>> get_ids() const override;
|
||||
virtual future<tasks::is_abortable> is_abortable() const override;
|
||||
|
||||
virtual future<std::optional<tasks::task_status>> get_status(tasks::task_id id) override;
|
||||
virtual future<std::optional<tasks::task_status>> wait(tasks::task_id id) override;
|
||||
virtual future<> abort(tasks::task_id id) noexcept override;
|
||||
virtual future<std::vector<tasks::task_stats>> get_stats() override;
|
||||
};
|
||||
|
||||
class task_manager_module : public tasks::task_manager::module {
|
||||
private:
|
||||
service::storage_service& _ss;
|
||||
public:
|
||||
task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept;
|
||||
|
||||
virtual std::set<gms::inet_address> get_nodes() const noexcept override;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -90,6 +90,10 @@ class gossiper;
|
||||
class loaded_endpoint_state;
|
||||
};
|
||||
|
||||
namespace node_ops {
|
||||
class task_manager_module;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
|
||||
class storage_service;
|
||||
@@ -955,6 +959,7 @@ private:
|
||||
abort_source _group0_as;
|
||||
|
||||
friend class join_node_rpc_handshaker;
|
||||
friend class node_ops::task_manager_module;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -104,6 +104,7 @@ public:
|
||||
|
||||
enum class task_group {
|
||||
// Each virtual task needs to have its group.
|
||||
topology_change_group,
|
||||
};
|
||||
|
||||
class task : public enable_lw_shared_from_this<task> {
|
||||
|
||||
Reference in New Issue
Block a user