messaging_service: supply and interpret rpc isolation_cookies

On the client side, we supply an isolation cookie based on the connection index
On the server side, we convert an isolation cookie back to a scheduling_group.

This has two advantages:
 - rpc processes the entire connection using the scheduling group, so that code
   is also isolated and accounted for
 - we can later add per-user connections; the previous approach of looking at the
   verb to decide the scheduling_group doesn't help because we don't have a set of
   verbs per user

With this, the main group sees <0.1% usage under simple read and write loads.
This commit is contained in:
Avi Kivity
2018-10-28 14:08:56 +02:00
committed by Botond Dénes
parent dbce57fa3c
commit 10dd08c9b0
2 changed files with 26 additions and 4 deletions

View File

@@ -321,6 +321,11 @@ void messaging_service::do_start_listen() {
// local or remote datacenter, and whether or not the connection will be used for gossip. We can fix
// the first by wrapping its server_socket, but not the second.
auto limits = rpc_resource_limits(_mcfg.rpc_memory_limit);
limits.isolate_connection = [this] (sstring isolation_cookie) {
rpc::isolation_config cfg;
cfg.sched_group = scheduling_group_for_isolation_cookie(isolation_cookie);
return cfg;
};
if (!_server[0]) {
auto listen = [&] (const gms::inet_address& a, rpc::streaming_domain_type sdomain) {
so.streaming_domain = sdomain;
@@ -523,10 +528,10 @@ static unsigned get_rpc_client_idx(messaging_verb verb) {
}
const messaging_service::scheduling_info_for_connection_index messaging_service::_scheduling_info_for_connection_index[4] = {
{ &scheduling_config::statement },
{ &scheduling_config::gossip },
{ &scheduling_config::streaming },
{ &scheduling_config::statement },
{ &scheduling_config::statement, "statement" },
{ &scheduling_config::gossip, "gossip" },
{ &scheduling_config::streaming, "streaming", },
{ &scheduling_config::statement, "statement-ack" },
};
scheduling_group
@@ -534,6 +539,20 @@ messaging_service::scheduling_group_for_verb(messaging_verb verb) const {
return _scheduling_config.*(_scheduling_info_for_connection_index[get_rpc_client_idx(verb)].sched_group_ptr);
}
scheduling_group
messaging_service::scheduling_group_for_isolation_cookie(const sstring& isolation_cookie) const {
// Once per connection, so a loop is fine.
for (auto&& info : _scheduling_info_for_connection_index) {
if (info.isolation_cookie == isolation_cookie) {
return _scheduling_config.*(info.sched_group_ptr);
}
}
// Client is using a new connection class that the server doesn't recognize yet.
// Assume it's important, after server upgrade we'll recognize it.
return default_scheduling_group();
}
/**
* Get an IP for a given endpoint to connect to
*
@@ -644,6 +663,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
}
opts.tcp_nodelay = must_tcp_nodelay;
opts.reuseaddr = true;
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
auto client = must_encrypt ?
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),

View File

@@ -227,6 +227,7 @@ public:
private:
struct scheduling_info_for_connection_index {
scheduling_group scheduling_config::*sched_group_ptr;
sstring isolation_cookie;
};
private:
gms::inet_address _listen_address;
@@ -529,6 +530,7 @@ public:
std::unique_ptr<rpc_protocol_wrapper>& rpc();
static msg_addr get_source(const rpc::client_info& client);
scheduling_group scheduling_group_for_verb(messaging_verb verb) const;
scheduling_group scheduling_group_for_isolation_cookie(const sstring& isolation_cookie) const;
};
extern distributed<messaging_service> _the_messaging_service;