messaging: tag RPC services with scheduling groups
Assign a scheduling_group for each RPC service. Assignement is done by connection (get_rpc_client_idx()) - all verbs on the same connection are assigned the same group. While this may seem arbitrary, it avoids priority inversion; if two verbs on the same connection have different scheduling groups, the verb with the low shares may cause a backlog and stall the connection, including following requests from verbs that ought to have higher shares. The scheduling_group parameters are encapsulated in different classes as they are passed around to avoid adding dependencies. Message-Id: <20180708140433.6426-1-avi@scylladb.com>
This commit is contained in:
committed by
Tomasz Grabiec
parent
cf7b42619d
commit
8c993e0728
7
init.cc
7
init.cc
@@ -53,6 +53,7 @@ void init_ms_fd_gossiper(sstring listen_address_in
|
||||
, sstring ms_compress
|
||||
, db::seed_provider_type seed_provider
|
||||
, size_t available_memory
|
||||
, init_scheduling_config scheduling_config
|
||||
, sstring cluster_name
|
||||
, double phi
|
||||
, bool sltba)
|
||||
@@ -111,7 +112,11 @@ void init_ms_fd_gossiper(sstring listen_address_in
|
||||
// Delay listening messaging_service until gossip message handlers are registered
|
||||
bool listen_now = false;
|
||||
netw::messaging_service::memory_config mcfg = { std::max<size_t>(0.08 * available_memory, 1'000'000) };
|
||||
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, sltba, listen_now).get();
|
||||
netw::messaging_service::scheduling_config scfg;
|
||||
scfg.statement = scheduling_config.statement;
|
||||
scfg.streaming = scheduling_config.streaming;
|
||||
scfg.gossip = scheduling_config.gossip;
|
||||
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba, listen_now).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return netw::get_messaging_service().stop(); });
|
||||
|
||||
8
init.hh
8
init.hh
@@ -38,6 +38,13 @@ extern logging::logger startlog;
|
||||
class bad_configuration_error : public std::exception {};
|
||||
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&);
|
||||
|
||||
struct init_scheduling_config {
|
||||
scheduling_group streaming;
|
||||
scheduling_group statement;
|
||||
scheduling_group gossip;
|
||||
};
|
||||
|
||||
void init_ms_fd_gossiper(sstring listen_address
|
||||
, uint16_t storage_port
|
||||
, uint16_t ssl_storage_port
|
||||
@@ -51,6 +58,7 @@ void init_ms_fd_gossiper(sstring listen_address
|
||||
, sstring ms_compress
|
||||
, db::seed_provider_type seed_provider
|
||||
, size_t available_memory
|
||||
, init_scheduling_config scheduling_config
|
||||
, sstring cluster_name = "Test Cluster"
|
||||
, double phi = 8
|
||||
, bool sltba = false);
|
||||
|
||||
5
main.cc
5
main.cc
@@ -588,6 +588,10 @@ int main(int ac, char** av) {
|
||||
cluster_name = "Test Cluster";
|
||||
startlog.warn("Using default cluster name is not recommended. Using a unique cluster name will reduce the chance of adding nodes to the wrong cluster by mistake");
|
||||
}
|
||||
init_scheduling_config scfg;
|
||||
scfg.statement = dbcfg.statement_scheduling_group;
|
||||
scfg.streaming = dbcfg.streaming_scheduling_group;
|
||||
scfg.gossip = scheduling_group();
|
||||
init_ms_fd_gossiper(listen_address
|
||||
, storage_port
|
||||
, ssl_storage_port
|
||||
@@ -601,6 +605,7 @@ int main(int ac, char** av) {
|
||||
, cfg->internode_compression()
|
||||
, seed_provider
|
||||
, memory::stats().total_memory()
|
||||
, scfg
|
||||
, cluster_name
|
||||
, phi
|
||||
, cfg->listen_on_broadcast_address());
|
||||
|
||||
@@ -236,11 +236,12 @@ bool messaging_service::knows_version(const gms::inet_address& endpoint) const {
|
||||
// Register a handler (a callback lambda) for verb
|
||||
template <typename Func>
|
||||
void register_handler(messaging_service* ms, messaging_verb verb, Func&& func) {
|
||||
ms->rpc()->register_handler(verb, std::move(func));
|
||||
ms->rpc()->register_handler(verb, ms->scheduling_group_for_verb(verb), std::move(func));
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(gms::inet_address ip, uint16_t port, bool listen_now)
|
||||
: messaging_service(std::move(ip), port, encrypt_what::none, compress_what::none, tcp_nodelay_what::all, 0, nullptr, memory_config{1'000'000}, false, listen_now)
|
||||
: messaging_service(std::move(ip), port, encrypt_what::none, compress_what::none, tcp_nodelay_what::all, 0, nullptr, memory_config{1'000'000},
|
||||
scheduling_config{}, false, listen_now)
|
||||
{}
|
||||
|
||||
static
|
||||
@@ -310,6 +311,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, uint16_t ssl_port
|
||||
, std::shared_ptr<seastar::tls::credentials_builder> credentials
|
||||
, messaging_service::memory_config mcfg
|
||||
, scheduling_config scfg
|
||||
, bool sltba
|
||||
, bool listen_now)
|
||||
: _listen_address(ip)
|
||||
@@ -322,6 +324,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
|
||||
, _mcfg(mcfg)
|
||||
, _scheduling_config(scfg)
|
||||
{
|
||||
_rpc->set_logger([] (const sstring& log) {
|
||||
rpc_logger.info("{}", log);
|
||||
@@ -410,6 +413,17 @@ static unsigned get_rpc_client_idx(messaging_verb verb) {
|
||||
return idx;
|
||||
}
|
||||
|
||||
scheduling_group
|
||||
messaging_service::scheduling_group_for_verb(messaging_verb verb) const {
|
||||
static const scheduling_group scheduling_config::*idx_to_group[] = {
|
||||
&scheduling_config::statement,
|
||||
&scheduling_config::gossip,
|
||||
&scheduling_config::streaming,
|
||||
&scheduling_config::statement,
|
||||
};
|
||||
return _scheduling_config.*(idx_to_group[get_rpc_client_idx(verb)]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an IP for a given endpoint to connect to
|
||||
*
|
||||
|
||||
@@ -184,6 +184,12 @@ public:
|
||||
size_t rpc_memory_limit = 1'000'000;
|
||||
};
|
||||
|
||||
struct scheduling_config {
|
||||
scheduling_group statement;
|
||||
scheduling_group streaming;
|
||||
scheduling_group gossip;
|
||||
};
|
||||
|
||||
private:
|
||||
gms::inet_address _listen_address;
|
||||
uint16_t _port;
|
||||
@@ -203,7 +209,7 @@ private:
|
||||
bool _stopping = false;
|
||||
std::list<std::function<void(gms::inet_address ep)>> _connection_drop_notifiers;
|
||||
memory_config _mcfg;
|
||||
|
||||
scheduling_config _scheduling_config;
|
||||
public:
|
||||
using clock_type = lowres_clock;
|
||||
public:
|
||||
@@ -211,7 +217,7 @@ public:
|
||||
uint16_t port = 7000, bool listen_now = true);
|
||||
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what, compress_what, tcp_nodelay_what,
|
||||
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>,
|
||||
memory_config mcfg, bool sltba = false, bool listen_now = true);
|
||||
memory_config mcfg, scheduling_config scfg, bool sltba = false, bool listen_now = true);
|
||||
~messaging_service();
|
||||
public:
|
||||
void start_listen();
|
||||
@@ -360,6 +366,7 @@ public:
|
||||
void unregister_connection_drop_notifier(drop_notifier_handler h);
|
||||
std::unique_ptr<rpc_protocol_wrapper>& rpc();
|
||||
static msg_addr get_source(const rpc::client_info& client);
|
||||
scheduling_group scheduling_group_for_verb(messaging_verb verb) const;
|
||||
};
|
||||
|
||||
extern distributed<messaging_service> _the_messaging_service;
|
||||
|
||||
Reference in New Issue
Block a user