Files
scylladb/test/manual/message.cc
Pavel Emelyanov 18b5a49b0c Populate all sl:* groups into dedicated top-level supergroup
This patch changes the layout of user-facing scheduling groups from

/
`- statement
`- sl:default
`- sl:*
`- other groups (compaction, streaming, etc.)

into

/
`- user (supergroup)
   `- statement
   `- sl:default
   `- sl:*
`- other groups (compaction, streaming, etc.)

The new supergroup has 1000 static shares and is name-less, in a sense
that it only have a variable in the code to refer to and is not exported
via metrics (should be fixed in seastar if we want to).

The moved groups don't change their names or shares, only move inside
the scheduling hierarchy.

The goal of the change is to improve resource consumption of sl:*
groups. Right now activities in low-shares service levels are scheduled
on-par with e.g. streaming activity, which is considered to be low-prio
one. By moving all sl:* groups into their own supergroup with 1000
shares changes the meaning of sl:* shares. From now on these shares
values describe preirities of service level between each-other, and the
user activities compete with the rest of the system with 1000 shares,
regardless of how many service levels are there.

Unit tests keep their user groups under root supergroup (for simplicity)

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#28235
2026-01-21 14:14:48 +02:00

247 lines
11 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <chrono>
#include <optional>
#include <seastar/core/reactor.hh>
#include <seastar/core/app-template.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/rpc/rpc_types.hh>
#include <seastar/util/closeable.hh>
#include "db/config.hh"
#include "gms/feature_service.hh"
#include "message/messaging_service.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"
#include "gms/gossip_digest_ack2.hh"
#include "gms/gossip_digest.hh"
#include "api/api.hh"
#include "utils/UUID.hh"
#include "utils/log.hh"
#include "locator/token_metadata.hh"
#include "db/schema_tables.hh"
#include "idl/gossip.dist.hh"
#include "service/qos/service_level_controller.hh"
using namespace std::chrono_literals;
using namespace netw;
logging::logger test_logger("message_test");
class tester {
private:
messaging_service& ms;
gms::inet_address _server;
uint32_t _cpuid;
locator::host_id _server_id = locator::host_id{utils::UUID("00000000-0000-1000-0000-000000000001")};
public:
tester(netw::messaging_service& ms_) : ms(ms_) {}
using msg_addr = netw::messaging_service::msg_addr;
using inet_address = gms::inet_address;
using endpoint_state = gms::endpoint_state;
msg_addr get_msg_addr() {
return msg_addr{_server, _cpuid};
}
void set_server_ip(sstring ip) {
_server = inet_address(ip);
}
void set_server_cpuid(uint32_t cpu) {
_cpuid = cpu;
}
future<> stop() {
return make_ready_future<>();
}
promise<> digest_test_done;
uint16_t port() const { return ms.port(); }
public:
void init_handler() {
ser::gossip_rpc_verbs::register_gossip_digest_syn(&ms, [this] (const rpc::client_info& cinfo, gms::gossip_digest_syn msg) {
test_logger.info("Server got syn msg = {}", msg);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto ep1 = inet_address("1.1.1.1");
auto ep2 = inet_address("2.2.2.2");
gms::generation_type gen(800);
gms::version_type ver(900);
utils::chunked_vector<gms::gossip_digest> digests;
digests.push_back(gms::gossip_digest(ep1, gen++, ver++));
digests.push_back(gms::gossip_digest(ep2, gen++, ver++));
std::map<inet_address, endpoint_state> eps{
{ep1, endpoint_state(ep1)},
{ep2, endpoint_state(ep2)},
};
gms::gossip_digest_ack ack(std::move(digests), std::move(eps));
// FIXME: discarded future.
(void)ser::gossip_rpc_verbs::send_gossip_digest_ack(&ms, from, std::move(ack)).handle_exception([] (auto ep) {
test_logger.error("Fail to send ack : {}", ep);
});
return make_ready_future<rpc::no_wait_type>(netw::messaging_service::no_wait());
});
ser::gossip_rpc_verbs::register_gossip_digest_ack(&ms, [this] (const rpc::client_info& cinfo, gms::gossip_digest_ack msg) {
test_logger.info("Server got ack msg = {}", msg);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
// Prepare gossip_digest_ack2 message
auto ep1 = inet_address("3.3.3.3");
std::map<inet_address, endpoint_state> eps{
{ep1, endpoint_state(ep1)},
};
gms::gossip_digest_ack2 ack2(std::move(eps));
// FIXME: discarded future.
(void)ser::gossip_rpc_verbs::send_gossip_digest_ack2(&ms, from, std::move(ack2)).handle_exception([] (auto ep) {
test_logger.error("Fail to send ack2 : {}", ep);
});
digest_test_done.set_value();
return make_ready_future<rpc::no_wait_type>(netw::messaging_service::no_wait());
});
ser::gossip_rpc_verbs::register_gossip_digest_ack2(&ms, [] (const rpc::client_info& cinfo, gms::gossip_digest_ack2 msg) {
test_logger.info("Server got ack2 msg = {}", msg);
return make_ready_future<rpc::no_wait_type>(netw::messaging_service::no_wait());
});
ser::gossip_rpc_verbs::register_gossip_shutdown(&ms, [] (const rpc::client_info& cinfo, inet_address from, rpc::optional<int64_t> generation_number_opt) {
test_logger.info("Server got shutdown msg = {}", from);
return make_ready_future<rpc::no_wait_type>(netw::messaging_service::no_wait());
});
ser::gossip_rpc_verbs::register_gossip_echo(&ms, [] (const rpc::client_info& cinfo, rpc::opt_time_point, rpc::optional<int64_t> gen_opt, rpc::optional<bool> notify_up) {
test_logger.info("Server got gossip echo msg");
throw std::runtime_error("I'm throwing runtime_error exception");
return make_ready_future<>();
});
}
future<> deinit_handler() {
co_await ser::gossip_rpc_verbs::unregister(&ms);
test_logger.info("tester deinit_hadler done");
}
public:
future<> test_gossip_digest() {
test_logger.info("=== {} ===", __func__);
// Prepare gossip_digest_syn message
auto id = get_msg_addr();
auto ep1 = inet_address("1.1.1.1");
auto ep2 = inet_address("2.2.2.2");
gms::generation_type gen(100);
gms::version_type ver(900);
utils::chunked_vector<gms::gossip_digest> digests;
digests.push_back(gms::gossip_digest(ep1, gen++, ver++));
digests.push_back(gms::gossip_digest(ep2, gen++, ver++));
gms::gossip_digest_syn syn("my_cluster", "my_partition", digests, utils::null_uuid(), utils::null_uuid());
return ser::gossip_rpc_verbs::send_gossip_digest_syn(&ms, id, std::move(syn)).then([this] {
test_logger.info("Sent gossip sigest syn. Waiting for digest_test_done...");
return digest_test_done.get_future();
});
}
future<> test_gossip_shutdown() {
test_logger.info("=== {} ===", __func__);
inet_address from("127.0.0.1");
int64_t gen = 0x1;
return ser::gossip_rpc_verbs::send_gossip_shutdown(&ms, _server_id, from, gen).then([] () {
test_logger.info("Client sent gossip_shutdown got reply = void");
return make_ready_future<>();
});
}
future<> test_echo() {
test_logger.info("=== {} ===", __func__);
int64_t gen = 0x1;
abort_source as;
try {
co_await ser::gossip_rpc_verbs::send_gossip_echo(&ms, _server_id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), as, gen, false);
} catch (...) {
test_logger.error("test_echo: {}", std::current_exception());
}
}
};
namespace bpo = boost::program_options;
// Usage example: build/dev/test/manual/message --listen 127.0.0.1 --server 127.0.0.1
int main(int ac, char ** av) {
app_template app;
app.add_options()
("server", bpo::value<std::string>(), "Server ip")
("listen-address", bpo::value<std::string>()->default_value("0.0.0.0"), "IP address to listen")
("api-port", bpo::value<uint16_t>()->default_value(10000), "Http Rest API port")
("stay-alive", bpo::value<bool>()->default_value(false), "Do not kill the test server after the test")
("cpuid", bpo::value<uint32_t>()->default_value(0), "Server cpuid");
sharded<replica::database> db;
sharded<auth::service> auth_service;
locator::shared_token_metadata tm({}, {});
sharded<qos::service_level_controller> sl_controller;
return app.run_deprecated(ac, av, [&app, &auth_service, &tm, &sl_controller] {
return seastar::async([&app, &auth_service, &tm, &sl_controller] {
auto config = app.configuration();
bool stay_alive = config["stay-alive"].as<bool>();
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
auto my_address = listen != gms::inet_address("0.0.0.0") ? listen : gms::inet_address("localhost");
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
sharded<locator::shared_token_metadata> token_metadata;
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_tm = deferred_stop(token_metadata);
auto default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get();
sharded<abort_source> as;
as.start().get();
auto stop_as = defer([&as] { as.stop().get(); });
sl_controller.start(std::ref(auth_service), std::ref(tm), std::ref(as), qos::service_level_options{.shares = 1000}, scheduling_supergroup(), default_scheduling_group).get();
seastar::sharded<netw::walltime_compressor_tracker> compressor_tracker;
compressor_tracker.start([] { return netw::walltime_compressor_tracker::config{}; }).get();
auto stop_compressor_tracker = deferred_stop(compressor_tracker);
seastar::sharded<gms::feature_service> feature_service;
gms::feature_config cfg;
feature_service.start(cfg).get();
seastar::sharded<gms::gossip_address_map> gossip_address_map;
gossip_address_map.start().get();
seastar::sharded<netw::messaging_service> messaging;
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service),
std::ref(gossip_address_map), gms::generation_type{}, std::ref(compressor_tracker),
std::ref(sl_controller)).get();
auto stop_messaging = deferred_stop(messaging);
seastar::sharded<tester> testers;
testers.start(std::ref(messaging)).get();
auto stop_testers = deferred_stop(testers);
auto port = testers.local().port();
test_logger.info("Messaging server listening on {} port {}", listen, port);
testers.invoke_on_all(&tester::init_handler).get();
auto deinit_testers = deferred_action([&testers] {
testers.invoke_on_all(&tester::deinit_handler).get();
});
messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata), [] (gms::inet_address ip){ return locator::host_id{}; }).get();
if (config.contains("server")) {
auto ip = config["server"].as<std::string>();
auto cpuid = config["cpuid"].as<uint32_t>();
auto t = &testers.local();
t->set_server_ip(ip);
t->set_server_cpuid(cpuid);
test_logger.info("=============TEST START===========");
test_logger.info("Sending to server ....");
t->test_gossip_digest().get();
t->test_gossip_shutdown().get();
t->test_echo().get();
test_logger.info("=============TEST DONE===========");
}
while (stay_alive) {
seastar::sleep(1s).get();
}
}).finally([] {
exit(0);
});
});
}