messaging_service: introduce the tenant concept
Tenants get their own connections for statement verbs and are further isolated from each other by different scheduling groups. A tenant is identified by a scheduling group and a name. When selecting the client index for a statement verb, we look up the tenant whose scheduling group matches the current one. This scheduling group is persisted across the RPC call, using the name to identify the tenant on the remote end, where a reverse lookup (name -> scheduling group) happens. Instead of a single scheduling group to be used for all statement verbs, messaging_service::scheduling_config now contains a list of tenants. The first among these is the default tenant, the one we use when the current scheduling group doesn't match that of any configured tenant. To make this mapping easier, we reshuffle the client index assignment, such that statement and statement-ack verbs have the idx 2 and 3 respectively, instead of 0 and 3. The tenant configuration is configured at message service construction time and cannot be changed after. Adding such capability should be easy but is not needed for query classification, the current user of the tenant concept. Currently two tenants are configured: $user (default tenant) and $system.
This commit is contained in:
2
init.cc
2
init.cc
@@ -111,7 +111,7 @@ void init_ms_fd_gossiper(sharded<gms::gossiper>& gossiper
|
||||
// Delay listening messaging_service until gossip message handlers are registered
|
||||
netw::messaging_service::memory_config mcfg = { std::max<size_t>(0.08 * available_memory, 1'000'000) };
|
||||
netw::messaging_service::scheduling_config scfg;
|
||||
scfg.statement = scheduling_config.statement;
|
||||
scfg.statement_tenants = { {scheduling_config.statement, "$user"}, {default_scheduling_group(), "$system"} };
|
||||
scfg.streaming = scheduling_config.streaming;
|
||||
scfg.gossip = scheduling_config.gossip;
|
||||
netw::get_messaging_service().start(listen, storage_port, ew, cw, tndw, ssl_storage_port, creds, mcfg, scfg, sltba).get();
|
||||
|
||||
@@ -279,7 +279,7 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
|
||||
|
||||
messaging_service::messaging_service(gms::inet_address ip, uint16_t port)
|
||||
: messaging_service(std::move(ip), port, encrypt_what::none, compress_what::none, tcp_nodelay_what::all, 0, nullptr, memory_config{1'000'000},
|
||||
scheduling_config{}, false)
|
||||
scheduling_config{{{{}, "$default"}}, {}, {}}, false)
|
||||
{}
|
||||
|
||||
static
|
||||
@@ -391,6 +391,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, _should_listen_to_broadcast_address(sltba)
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
|
||||
, _clients(2 + scfg.statement_tenants.size() * 2)
|
||||
, _mcfg(mcfg)
|
||||
, _scheduling_config(scfg)
|
||||
, _scheduling_info_for_connection_index(initial_scheduling_info())
|
||||
@@ -402,6 +403,11 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
ci.attach_auxiliary("max_result_size", max_result_size.value_or(query::result_memory_limiter::maximum_result_size));
|
||||
return rpc::no_wait;
|
||||
});
|
||||
|
||||
_connection_index_for_tenant.reserve(_scheduling_config.statement_tenants.size());
|
||||
for (unsigned i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
|
||||
_connection_index_for_tenant.push_back({_scheduling_config.statement_tenants[i].sched_group, i});
|
||||
}
|
||||
}
|
||||
|
||||
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
|
||||
@@ -455,24 +461,6 @@ rpc::no_wait_type messaging_service::no_wait() {
|
||||
|
||||
static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
switch (verb) {
|
||||
case messaging_verb::CLIENT_ID:
|
||||
case messaging_verb::MUTATION:
|
||||
case messaging_verb::READ_DATA:
|
||||
case messaging_verb::READ_MUTATION_DATA:
|
||||
case messaging_verb::READ_DIGEST:
|
||||
case messaging_verb::GOSSIP_DIGEST_ACK:
|
||||
case messaging_verb::DEFINITIONS_UPDATE:
|
||||
case messaging_verb::TRUNCATE:
|
||||
case messaging_verb::MIGRATION_REQUEST:
|
||||
case messaging_verb::SCHEMA_CHECK:
|
||||
case messaging_verb::COUNTER_MUTATION:
|
||||
// Use the same RPC client for light weight transaction
|
||||
// protocol steps as for standard mutations and read requests.
|
||||
case messaging_verb::PAXOS_PREPARE:
|
||||
case messaging_verb::PAXOS_ACCEPT:
|
||||
case messaging_verb::PAXOS_LEARN:
|
||||
case messaging_verb::PAXOS_PRUNE:
|
||||
return 0;
|
||||
// GET_SCHEMA_VERSION is sent from read/mutate verbs so should be
|
||||
// sent on a different connection to avoid potential deadlocks
|
||||
// as well as reduce latency as there are potentially many requests
|
||||
@@ -482,7 +470,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::GOSSIP_SHUTDOWN:
|
||||
case messaging_verb::GOSSIP_ECHO:
|
||||
case messaging_verb::GET_SCHEMA_VERSION:
|
||||
return 1;
|
||||
return 0;
|
||||
case messaging_verb::PREPARE_MESSAGE:
|
||||
case messaging_verb::PREPARE_DONE_MESSAGE:
|
||||
case messaging_verb::STREAM_MUTATION:
|
||||
@@ -505,6 +493,24 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
|
||||
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
|
||||
case messaging_verb::HINT_MUTATION:
|
||||
return 1;
|
||||
case messaging_verb::CLIENT_ID:
|
||||
case messaging_verb::MUTATION:
|
||||
case messaging_verb::READ_DATA:
|
||||
case messaging_verb::READ_MUTATION_DATA:
|
||||
case messaging_verb::READ_DIGEST:
|
||||
case messaging_verb::GOSSIP_DIGEST_ACK:
|
||||
case messaging_verb::DEFINITIONS_UPDATE:
|
||||
case messaging_verb::TRUNCATE:
|
||||
case messaging_verb::MIGRATION_REQUEST:
|
||||
case messaging_verb::SCHEMA_CHECK:
|
||||
case messaging_verb::COUNTER_MUTATION:
|
||||
// Use the same RPC client for light weight transaction
|
||||
// protocol steps as for standard mutations and read requests.
|
||||
case messaging_verb::PAXOS_PREPARE:
|
||||
case messaging_verb::PAXOS_ACCEPT:
|
||||
case messaging_verb::PAXOS_LEARN:
|
||||
case messaging_verb::PAXOS_PRUNE:
|
||||
return 2;
|
||||
case messaging_verb::MUTATION_DONE:
|
||||
case messaging_verb::MUTATION_FAILED:
|
||||
@@ -524,18 +530,41 @@ static constexpr std::array<uint8_t, static_cast<size_t>(messaging_verb::LAST)>
|
||||
|
||||
static std::array<uint8_t, static_cast<size_t>(messaging_verb::LAST)> s_rpc_client_idx_table = make_rpc_client_idx_table();
|
||||
|
||||
static unsigned get_rpc_client_idx(messaging_verb verb) {
|
||||
return s_rpc_client_idx_table[static_cast<size_t>(verb)];
|
||||
unsigned
|
||||
messaging_service::get_rpc_client_idx(messaging_verb verb) const {
|
||||
auto idx = s_rpc_client_idx_table[static_cast<size_t>(verb)];
|
||||
|
||||
if (idx < 2) {
|
||||
return idx;
|
||||
}
|
||||
|
||||
// A statement or statement-ack verb
|
||||
const auto curr_sched_group = current_scheduling_group();
|
||||
for (unsigned i = 0; i < _connection_index_for_tenant.size(); ++i) {
|
||||
if (_connection_index_for_tenant[i].sched_group == curr_sched_group) {
|
||||
// i == 0: the default tenant maps to the default client indexes of 2 and 3.
|
||||
idx += i * 2;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return idx;
|
||||
}
|
||||
|
||||
std::vector<messaging_service::scheduling_info_for_connection_index>
|
||||
messaging_service::initial_scheduling_info() const {
|
||||
return std::vector<scheduling_info_for_connection_index>({
|
||||
{ _scheduling_config.statement, "statement" },
|
||||
if (_scheduling_config.statement_tenants.empty()) {
|
||||
throw std::runtime_error("messaging_service::initial_scheduling_info(): must have at least one tenant configured");
|
||||
}
|
||||
auto sched_infos = std::vector<scheduling_info_for_connection_index>({
|
||||
{ _scheduling_config.gossip, "gossip" },
|
||||
{ _scheduling_config.streaming, "streaming", },
|
||||
{ _scheduling_config.statement, "statement-ack" },
|
||||
});
|
||||
sched_infos.reserve(sched_infos.size() + _scheduling_config.statement_tenants.size() * 2);
|
||||
for (const auto& tenant : _scheduling_config.statement_tenants) {
|
||||
sched_infos.push_back({ tenant.sched_group, "statement:" + tenant.name });
|
||||
sched_infos.push_back({ tenant.sched_group, "statement-ack:" + tenant.name });
|
||||
}
|
||||
return sched_infos;
|
||||
};
|
||||
|
||||
scheduling_group
|
||||
@@ -667,7 +696,10 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
}
|
||||
opts.tcp_nodelay = must_tcp_nodelay;
|
||||
opts.reuseaddr = true;
|
||||
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
|
||||
// We send cookies only for non-default statement tenant clients.
|
||||
if (idx > 3) {
|
||||
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
|
||||
}
|
||||
|
||||
auto client = must_encrypt ?
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
|
||||
|
||||
@@ -219,7 +219,17 @@ public:
|
||||
};
|
||||
|
||||
struct scheduling_config {
|
||||
scheduling_group statement;
|
||||
struct tenant {
|
||||
scheduling_group sched_group;
|
||||
sstring name;
|
||||
};
|
||||
// Must have at least one element. No two tenants should have the same
|
||||
// scheduling group. [0] is the default tenant, that all unknown
|
||||
// scheduling groups will fall back to. The default tenant should use
|
||||
// the statement scheduling group, for backward compatibility. In fact
|
||||
// any other scheduling group would be dropped as the default tenant,
|
||||
// does not transfer its scheduling group across the wire.
|
||||
std::vector<tenant> statement_tenants;
|
||||
scheduling_group streaming;
|
||||
scheduling_group gossip;
|
||||
};
|
||||
@@ -229,6 +239,10 @@ private:
|
||||
scheduling_group sched_group;
|
||||
sstring isolation_cookie;
|
||||
};
|
||||
struct tenant_connection_index {
|
||||
scheduling_group sched_group;
|
||||
unsigned cliend_idx;
|
||||
};
|
||||
private:
|
||||
gms::inet_address _listen_address;
|
||||
uint16_t _port;
|
||||
@@ -244,13 +258,14 @@ private:
|
||||
::shared_ptr<seastar::tls::server_credentials> _credentials;
|
||||
std::unique_ptr<seastar::tls::credentials_builder> _credentials_builder;
|
||||
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server_tls;
|
||||
std::array<clients_map, 4> _clients;
|
||||
std::vector<clients_map> _clients;
|
||||
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
|
||||
bool _stopping = false;
|
||||
std::list<std::function<void(gms::inet_address ep)>> _connection_drop_notifiers;
|
||||
memory_config _mcfg;
|
||||
scheduling_config _scheduling_config;
|
||||
std::vector<scheduling_info_for_connection_index> _scheduling_info_for_connection_index;
|
||||
std::vector<tenant_connection_index> _connection_index_for_tenant;
|
||||
public:
|
||||
using clock_type = lowres_clock;
|
||||
public:
|
||||
@@ -532,6 +547,7 @@ public:
|
||||
scheduling_group scheduling_group_for_verb(messaging_verb verb) const;
|
||||
scheduling_group scheduling_group_for_isolation_cookie(const sstring& isolation_cookie) const;
|
||||
std::vector<messaging_service::scheduling_info_for_connection_index> initial_scheduling_info() const;
|
||||
unsigned get_rpc_client_idx(messaging_verb verb) const;
|
||||
};
|
||||
|
||||
extern distributed<messaging_service> _the_messaging_service;
|
||||
|
||||
Reference in New Issue
Block a user