messaging_service: introduce the tenant concept

Tenants get their own connections for statement verbs and are further
isolated from each other by different scheduling groups. A tenant is
identified by a scheduling group and a name. When selecting the client
index for a statement verb, we look up the tenant whose scheduling group
matches the current one. This scheduling group is persisted across the
RPC call, using the name to identify the tenant on the remote end, where
a reverse lookup (name -> scheduling group) happens.

Instead of a single scheduling group to be used for all statement verbs,
messaging_service::scheduling_config now contains a list of tenants. The
first among these is the default tenant, the one we use when the current
scheduling group doesn't match that of any configured tenant.
To make this mapping easier, we reshuffle the client index assignment,
such that statement and statement-ack verbs have the idx 2 and 3
respectively, instead of 0 and 3.

The tenant configuration is configured at message service construction
time and cannot be changed after. Adding such capability should be easy
but is not needed for query classification, the current user of the
tenant concept.

Currently two tenants are configured: $user (default tenant) and
$system.
This commit is contained in:
Botond Dénes
2020-05-20 15:38:40 +03:00
parent db8974fef3
commit 16d8cdadc9
3 changed files with 77 additions and 29 deletions

View File

@@ -111,7 +111,7 @@ void init_ms_fd_gossiper(sharded<gms::gossiper>& gossiper
// Delay listening messaging_service until gossip message handlers are registered
netw::messaging_service::memory_config mcfg = { std::max<size_t>(0.08 * available_memory, 1'000'000) };
netw::messaging_service::scheduling_config scfg;
scfg.statement = scheduling_config.statement;
scfg.statement_tenants = { {scheduling_config.statement, "$user"}, {default_scheduling_group(), "$system"} };
scfg.streaming = scheduling_config.streaming;
scfg.gossip = scheduling_config.gossip;
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba).get();

View File

@@ -279,7 +279,7 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
messaging_service::messaging_service(gms::inet_address ip, uint16_t port)
: messaging_service(std::move(ip), port, encrypt_what::none, compress_what::none, tcp_nodelay_what::all, 0, nullptr, memory_config{1'000'000},
scheduling_config{}, false)
scheduling_config{{{{}, "$default"}}, {}, {}}, false)
{}
static
@@ -391,6 +391,7 @@ messaging_service::messaging_service(gms::inet_address ip
, _should_listen_to_broadcast_address(sltba)
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
, _clients(2 + scfg.statement_tenants.size() * 2)
, _mcfg(mcfg)
, _scheduling_config(scfg)
, _scheduling_info_for_connection_index(initial_scheduling_info())
@@ -402,6 +403,11 @@ messaging_service::messaging_service(gms::inet_address ip
ci.attach_auxiliary("max_result_size", max_result_size.value_or(query::result_memory_limiter::maximum_result_size));
return rpc::no_wait;
});
_connection_index_for_tenant.reserve(_scheduling_config.statement_tenants.size());
for (unsigned i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
_connection_index_for_tenant.push_back({_scheduling_config.statement_tenants[i].sched_group, i});
}
}
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
@@ -455,24 +461,6 @@ rpc::no_wait_type messaging_service::no_wait() {
static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
switch (verb) {
case messaging_verb::CLIENT_ID:
case messaging_verb::MUTATION:
case messaging_verb::READ_DATA:
case messaging_verb::READ_MUTATION_DATA:
case messaging_verb::READ_DIGEST:
case messaging_verb::GOSSIP_DIGEST_ACK:
case messaging_verb::DEFINITIONS_UPDATE:
case messaging_verb::TRUNCATE:
case messaging_verb::MIGRATION_REQUEST:
case messaging_verb::SCHEMA_CHECK:
case messaging_verb::COUNTER_MUTATION:
// Use the same RPC client for light weight transaction
// protocol steps as for standard mutations and read requests.
case messaging_verb::PAXOS_PREPARE:
case messaging_verb::PAXOS_ACCEPT:
case messaging_verb::PAXOS_LEARN:
case messaging_verb::PAXOS_PRUNE:
return 0;
// GET_SCHEMA_VERSION is sent from read/mutate verbs so should be
// sent on a different connection to avoid potential deadlocks
// as well as reduce latency as there are potentially many requests
@@ -482,7 +470,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::GOSSIP_SHUTDOWN:
case messaging_verb::GOSSIP_ECHO:
case messaging_verb::GET_SCHEMA_VERSION:
return 1;
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
case messaging_verb::STREAM_MUTATION:
@@ -505,6 +493,24 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
case messaging_verb::HINT_MUTATION:
return 1;
case messaging_verb::CLIENT_ID:
case messaging_verb::MUTATION:
case messaging_verb::READ_DATA:
case messaging_verb::READ_MUTATION_DATA:
case messaging_verb::READ_DIGEST:
case messaging_verb::GOSSIP_DIGEST_ACK:
case messaging_verb::DEFINITIONS_UPDATE:
case messaging_verb::TRUNCATE:
case messaging_verb::MIGRATION_REQUEST:
case messaging_verb::SCHEMA_CHECK:
case messaging_verb::COUNTER_MUTATION:
// Use the same RPC client for light weight transaction
// protocol steps as for standard mutations and read requests.
case messaging_verb::PAXOS_PREPARE:
case messaging_verb::PAXOS_ACCEPT:
case messaging_verb::PAXOS_LEARN:
case messaging_verb::PAXOS_PRUNE:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:
@@ -524,18 +530,41 @@ static constexpr std::array<uint8_t, static_cast<size_t>(messaging_verb::LAST)>
static std::array<uint8_t, static_cast<size_t>(messaging_verb::LAST)> s_rpc_client_idx_table = make_rpc_client_idx_table();
static unsigned get_rpc_client_idx(messaging_verb verb) {
return s_rpc_client_idx_table[static_cast<size_t>(verb)];
unsigned
messaging_service::get_rpc_client_idx(messaging_verb verb) const {
auto idx = s_rpc_client_idx_table[static_cast<size_t>(verb)];
if (idx < 2) {
return idx;
}
// A statement or statement-ack verb
const auto curr_sched_group = current_scheduling_group();
for (unsigned i = 0; i < _connection_index_for_tenant.size(); ++i) {
if (_connection_index_for_tenant[i].sched_group == curr_sched_group) {
// i == 0: the default tenant maps to the default client indexes of 2 and 3.
idx += i * 2;
break;
}
}
return idx;
}
std::vector<messaging_service::scheduling_info_for_connection_index>
messaging_service::initial_scheduling_info() const {
return std::vector<scheduling_info_for_connection_index>({
{ _scheduling_config.statement, "statement" },
if (_scheduling_config.statement_tenants.empty()) {
throw std::runtime_error("messaging_service::initial_scheduling_info(): must have at least one tenant configured");
}
auto sched_infos = std::vector<scheduling_info_for_connection_index>({
{ _scheduling_config.gossip, "gossip" },
{ _scheduling_config.streaming, "streaming", },
{ _scheduling_config.statement, "statement-ack" },
});
sched_infos.reserve(sched_infos.size() + _scheduling_config.statement_tenants.size() * 2);
for (const auto& tenant : _scheduling_config.statement_tenants) {
sched_infos.push_back({ tenant.sched_group, "statement:" + tenant.name });
sched_infos.push_back({ tenant.sched_group, "statement-ack:" + tenant.name });
}
return sched_infos;
};
scheduling_group
@@ -667,7 +696,10 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
}
opts.tcp_nodelay = must_tcp_nodelay;
opts.reuseaddr = true;
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
// We send cookies only for non-default statement tenant clients.
if (idx > 3) {
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
}
auto client = must_encrypt ?
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),

View File

@@ -219,7 +219,17 @@ public:
};
struct scheduling_config {
scheduling_group statement;
struct tenant {
scheduling_group sched_group;
sstring name;
};
// Must have at least one element. No two tenants should have the same
// scheduling group. [0] is the default tenant, that all unknown
// scheduling groups will fall back to. The default tenant should use
// the statement scheduling group, for backward compatibility. In fact
// any other scheduling group would be dropped as the default tenant,
// does not transfer its scheduling group across the wire.
std::vector<tenant> statement_tenants;
scheduling_group streaming;
scheduling_group gossip;
};
@@ -229,6 +239,10 @@ private:
scheduling_group sched_group;
sstring isolation_cookie;
};
struct tenant_connection_index {
scheduling_group sched_group;
unsigned cliend_idx;
};
private:
gms::inet_address _listen_address;
uint16_t _port;
@@ -244,13 +258,14 @@ private:
::shared_ptr<seastar::tls::server_credentials> _credentials;
std::unique_ptr<seastar::tls::credentials_builder> _credentials_builder;
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server_tls;
std::array<clients_map, 4> _clients;
std::vector<clients_map> _clients;
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
bool _stopping = false;
std::list<std::function<void(gms::inet_address ep)>> _connection_drop_notifiers;
memory_config _mcfg;
scheduling_config _scheduling_config;
std::vector<scheduling_info_for_connection_index> _scheduling_info_for_connection_index;
std::vector<tenant_connection_index> _connection_index_for_tenant;
public:
using clock_type = lowres_clock;
public:
@@ -532,6 +547,7 @@ public:
scheduling_group scheduling_group_for_verb(messaging_verb verb) const;
scheduling_group scheduling_group_for_isolation_cookie(const sstring& isolation_cookie) const;
std::vector<messaging_service::scheduling_info_for_connection_index> initial_scheduling_info() const;
unsigned get_rpc_client_idx(messaging_verb verb) const;
};
extern distributed<messaging_service> _the_messaging_service;