messaging_service: supply and interpret rpc isolation_cookies
On the client side, we supply an isolation cookie based on the connection index On the server side, we convert an isolation cookie back to a scheduling_group. This has two advantages: - rpc processes the entire connection using the scheduling group, so that code is also isolated and accounted for - we can later add per-user connections; the previous approach of looking at the verb to decide the scheduling_group doesn't help because we don't have a set of verbs per user With this, the main group sees <0.1% usage under simple read and write loads.
This commit is contained in:
@@ -321,6 +321,11 @@ void messaging_service::do_start_listen() {
|
||||
// local or remote datacenter, and whether or not the connection will be used for gossip. We can fix
|
||||
// the first by wrapping its server_socket, but not the second.
|
||||
auto limits = rpc_resource_limits(_mcfg.rpc_memory_limit);
|
||||
limits.isolate_connection = [this] (sstring isolation_cookie) {
|
||||
rpc::isolation_config cfg;
|
||||
cfg.sched_group = scheduling_group_for_isolation_cookie(isolation_cookie);
|
||||
return cfg;
|
||||
};
|
||||
if (!_server[0]) {
|
||||
auto listen = [&] (const gms::inet_address& a, rpc::streaming_domain_type sdomain) {
|
||||
so.streaming_domain = sdomain;
|
||||
@@ -523,10 +528,10 @@ static unsigned get_rpc_client_idx(messaging_verb verb) {
|
||||
}
|
||||
|
||||
const messaging_service::scheduling_info_for_connection_index messaging_service::_scheduling_info_for_connection_index[4] = {
|
||||
{ &scheduling_config::statement },
|
||||
{ &scheduling_config::gossip },
|
||||
{ &scheduling_config::streaming },
|
||||
{ &scheduling_config::statement },
|
||||
{ &scheduling_config::statement, "statement" },
|
||||
{ &scheduling_config::gossip, "gossip" },
|
||||
{ &scheduling_config::streaming, "streaming", },
|
||||
{ &scheduling_config::statement, "statement-ack" },
|
||||
};
|
||||
|
||||
scheduling_group
|
||||
@@ -534,6 +539,20 @@ messaging_service::scheduling_group_for_verb(messaging_verb verb) const {
|
||||
return _scheduling_config.*(_scheduling_info_for_connection_index[get_rpc_client_idx(verb)].sched_group_ptr);
|
||||
}
|
||||
|
||||
scheduling_group
|
||||
messaging_service::scheduling_group_for_isolation_cookie(const sstring& isolation_cookie) const {
|
||||
// Once per connection, so a loop is fine.
|
||||
for (auto&& info : _scheduling_info_for_connection_index) {
|
||||
if (info.isolation_cookie == isolation_cookie) {
|
||||
return _scheduling_config.*(info.sched_group_ptr);
|
||||
}
|
||||
}
|
||||
// Client is using a new connection class that the server doesn't recognize yet.
|
||||
// Assume it's important, after server upgrade we'll recognize it.
|
||||
return default_scheduling_group();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get an IP for a given endpoint to connect to
|
||||
*
|
||||
@@ -644,6 +663,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
}
|
||||
opts.tcp_nodelay = must_tcp_nodelay;
|
||||
opts.reuseaddr = true;
|
||||
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
|
||||
|
||||
auto client = must_encrypt ?
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
|
||||
|
||||
@@ -227,6 +227,7 @@ public:
|
||||
private:
|
||||
struct scheduling_info_for_connection_index {
|
||||
scheduling_group scheduling_config::*sched_group_ptr;
|
||||
sstring isolation_cookie;
|
||||
};
|
||||
private:
|
||||
gms::inet_address _listen_address;
|
||||
@@ -529,6 +530,7 @@ public:
|
||||
std::unique_ptr<rpc_protocol_wrapper>& rpc();
|
||||
static msg_addr get_source(const rpc::client_info& client);
|
||||
scheduling_group scheduling_group_for_verb(messaging_verb verb) const;
|
||||
scheduling_group scheduling_group_for_isolation_cookie(const sstring& isolation_cookie) const;
|
||||
};
|
||||
|
||||
extern distributed<messaging_service> _the_messaging_service;
|
||||
|
||||
Reference in New Issue
Block a user