messaging: tag RPC services with scheduling groups

Assign a scheduling_group for each RPC service. Assignement is
done by connection (get_rpc_client_idx()) - all verbs on the
same connection are assigned the same group. While this may seem
arbitrary, it avoids priority inversion; if two verbs on the same
connection have different scheduling groups, the verb with the low
shares may cause a backlog and stall the connection, including
following requests from verbs that ought to have higher shares.

The scheduling_group parameters are encapsulated in different
classes as they are passed around to avoid adding dependencies.
Message-Id: <20180708140433.6426-1-avi@scylladb.com>
This commit is contained in:
Avi Kivity
2018-07-08 17:04:33 +03:00
committed by Tomasz Grabiec
parent cf7b42619d
commit 8c993e0728
5 changed files with 44 additions and 5 deletions

View File

@@ -53,6 +53,7 @@ void init_ms_fd_gossiper(sstring listen_address_in
, sstring ms_compress
, db::seed_provider_type seed_provider
, size_t available_memory
, init_scheduling_config scheduling_config
, sstring cluster_name
, double phi
, bool sltba)
@@ -111,7 +112,11 @@ void init_ms_fd_gossiper(sstring listen_address_in
// Delay listening messaging_service until gossip message handlers are registered
bool listen_now = false;
netw::messaging_service::memory_config mcfg = { std::max<size_t>(0.08 * available_memory, 1'000'000) };
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, sltba, listen_now).get();
netw::messaging_service::scheduling_config scfg;
scfg.statement = scheduling_config.statement;
scfg.streaming = scheduling_config.streaming;
scfg.gossip = scheduling_config.gossip;
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba, listen_now).get();
// #293 - do not stop anything
//engine().at_exit([] { return netw::get_messaging_service().stop(); });

View File

@@ -38,6 +38,13 @@ extern logging::logger startlog;
class bad_configuration_error : public std::exception {};
void init_storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&);
struct init_scheduling_config {
scheduling_group streaming;
scheduling_group statement;
scheduling_group gossip;
};
void init_ms_fd_gossiper(sstring listen_address
, uint16_t storage_port
, uint16_t ssl_storage_port
@@ -51,6 +58,7 @@ void init_ms_fd_gossiper(sstring listen_address
, sstring ms_compress
, db::seed_provider_type seed_provider
, size_t available_memory
, init_scheduling_config scheduling_config
, sstring cluster_name = "Test Cluster"
, double phi = 8
, bool sltba = false);

View File

@@ -588,6 +588,10 @@ int main(int ac, char** av) {
cluster_name = "Test Cluster";
startlog.warn("Using default cluster name is not recommended. Using a unique cluster name will reduce the chance of adding nodes to the wrong cluster by mistake");
}
init_scheduling_config scfg;
scfg.statement = dbcfg.statement_scheduling_group;
scfg.streaming = dbcfg.streaming_scheduling_group;
scfg.gossip = scheduling_group();
init_ms_fd_gossiper(listen_address
, storage_port
, ssl_storage_port
@@ -601,6 +605,7 @@ int main(int ac, char** av) {
, cfg->internode_compression()
, seed_provider
, memory::stats().total_memory()
, scfg
, cluster_name
, phi
, cfg->listen_on_broadcast_address());

View File

@@ -236,11 +236,12 @@ bool messaging_service::knows_version(const gms::inet_address& endpoint) const {
// Register a handler (a callback lambda) for verb
template <typename Func>
void register_handler(messaging_service* ms, messaging_verb verb, Func&& func) {
ms->rpc()->register_handler(verb, std::move(func));
ms->rpc()->register_handler(verb, ms->scheduling_group_for_verb(verb), std::move(func));
}
messaging_service::messaging_service(gms::inet_address ip, uint16_t port, bool listen_now)
: messaging_service(std::move(ip), port, encrypt_what::none, compress_what::none, tcp_nodelay_what::all, 0, nullptr, memory_config{1'000'000}, false, listen_now)
: messaging_service(std::move(ip), port, encrypt_what::none, compress_what::none, tcp_nodelay_what::all, 0, nullptr, memory_config{1'000'000},
scheduling_config{}, false, listen_now)
{}
static
@@ -310,6 +311,7 @@ messaging_service::messaging_service(gms::inet_address ip
, uint16_t ssl_port
, std::shared_ptr<seastar::tls::credentials_builder> credentials
, messaging_service::memory_config mcfg
, scheduling_config scfg
, bool sltba
, bool listen_now)
: _listen_address(ip)
@@ -322,6 +324,7 @@ messaging_service::messaging_service(gms::inet_address ip
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
, _mcfg(mcfg)
, _scheduling_config(scfg)
{
_rpc->set_logger([] (const sstring& log) {
rpc_logger.info("{}", log);
@@ -410,6 +413,17 @@ static unsigned get_rpc_client_idx(messaging_verb verb) {
return idx;
}
scheduling_group
messaging_service::scheduling_group_for_verb(messaging_verb verb) const {
static const scheduling_group scheduling_config::*idx_to_group[] = {
&scheduling_config::statement,
&scheduling_config::gossip,
&scheduling_config::streaming,
&scheduling_config::statement,
};
return _scheduling_config.*(idx_to_group[get_rpc_client_idx(verb)]);
}
/**
* Get an IP for a given endpoint to connect to
*

View File

@@ -184,6 +184,12 @@ public:
size_t rpc_memory_limit = 1'000'000;
};
struct scheduling_config {
scheduling_group statement;
scheduling_group streaming;
scheduling_group gossip;
};
private:
gms::inet_address _listen_address;
uint16_t _port;
@@ -203,7 +209,7 @@ private:
bool _stopping = false;
std::list<std::function<void(gms::inet_address ep)>> _connection_drop_notifiers;
memory_config _mcfg;
scheduling_config _scheduling_config;
public:
using clock_type = lowres_clock;
public:
@@ -211,7 +217,7 @@ public:
uint16_t port = 7000, bool listen_now = true);
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what, compress_what, tcp_nodelay_what,
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>,
memory_config mcfg, bool sltba = false, bool listen_now = true);
memory_config mcfg, scheduling_config scfg, bool sltba = false, bool listen_now = true);
~messaging_service();
public:
void start_listen();
@@ -360,6 +366,7 @@ public:
void unregister_connection_drop_notifier(drop_notifier_handler h);
std::unique_ptr<rpc_protocol_wrapper>& rpc();
static msg_addr get_source(const rpc::client_info& client);
scheduling_group scheduling_group_for_verb(messaging_verb verb) const;
};
extern distributed<messaging_service> _the_messaging_service;