transport/controller: split configuring sockets into separate functions

TCP sockets and unix domain sockets don't share common listen options
excluding `socket_address`. For unix domain sockets, available options will be
expanded to cover also filesystem permissions and owner for the socket.
Storing listen options for both types of sockets in one structure would become messy.
For now, both use `listen_cfg`.

In a singular cql controller, only sockets of one type are created, thus it
can be easily split into two cases.
Isolate maintenance socket from `listen_cfg`.
This commit is contained in:
Mikołaj Grzebieluch
2024-01-26 16:17:55 +01:00
parent bd3ed168ab
commit 6b178f9a4a
2 changed files with 122 additions and 97 deletions

View File

@@ -65,6 +65,119 @@ future<> controller::start_server() {
return do_start_server().finally([this] { _ops_sem.signal(); });
}
static future<> listen_on_all_shards(sharded<cql_server>& cserver, socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool is_shard_aware, bool keepalive) {
co_await cserver.invoke_on_all([addr, creds, is_shard_aware, keepalive] (cql_server& server) {
return server.listen(addr, creds, is_shard_aware, keepalive);
});
logger.info("Starting listening for CQL clients on {} ({}, {})"
, addr, creds ? "encrypted" : "unencrypted", is_shard_aware ? "shard-aware" : "non-shard-aware"
);
}
future<> controller::start_listening_on_tcp_sockets(sharded<cql_server>& cserver) {
auto& cfg = _config;
auto preferred = cfg.rpc_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt;
auto family = cfg.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
auto ceo = cfg.client_encryption_options();
auto keepalive = cfg.rpc_keepalive();
struct listen_cfg {
socket_address addr;
bool is_shard_aware;
std::shared_ptr<seastar::tls::credentials_builder> cred;
};
_listen_addresses.clear();
std::vector<listen_cfg> configs;
const seastar::net::inet_address ip = utils::resolve(cfg.rpc_address, family, preferred).get0();
int native_port_idx = -1, native_shard_aware_port_idx = -1;
if (cfg.native_transport_port.is_set() ||
(!cfg.native_transport_port_ssl.is_set() && !cfg.native_transport_port.is_set())) {
// Non-SSL port is specified || neither SSL nor non-SSL ports are specified
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_transport_port()}, false });
_listen_addresses.push_back(configs.back().addr);
native_port_idx = 0;
}
if (cfg.native_shard_aware_transport_port.is_set() ||
(!cfg.native_shard_aware_transport_port_ssl.is_set() && !cfg.native_shard_aware_transport_port.is_set())) {
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true });
_listen_addresses.push_back(configs.back().addr);
native_shard_aware_port_idx = native_port_idx + 1;
}
// main should have made sure values are clean and neatish
if (utils::is_true(utils::get_or_default(ceo, "enabled", "false"))) {
auto cred = std::make_shared<seastar::tls::credentials_builder>();
utils::configure_tls_creds_builder(*cred, std::move(ceo)).get();
logger.info("Enabling encrypted CQL connections between client and server");
if (cfg.native_transport_port_ssl.is_set() &&
(!cfg.native_transport_port.is_set() ||
cfg.native_transport_port_ssl() != cfg.native_transport_port())) {
// SSL port is specified && non-SSL port is either left out or set to a different value
configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, false, cred});
_listen_addresses.push_back(configs.back().addr);
} else if (native_port_idx >= 0) {
configs[native_port_idx].cred = cred;
}
if (cfg.native_shard_aware_transport_port_ssl.is_set() &&
(!cfg.native_shard_aware_transport_port.is_set() ||
cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port())) {
configs.emplace_back(listen_cfg{{ip, cfg.native_shard_aware_transport_port_ssl()}, true, std::move(cred)});
_listen_addresses.push_back(configs.back().addr);
} else if (native_shard_aware_port_idx >= 0) {
configs[native_shard_aware_port_idx].cred = std::move(cred);
}
}
return parallel_for_each(configs, [&cserver, keepalive](const listen_cfg & cfg) {
return listen_on_all_shards(cserver, cfg.addr, cfg.cred, cfg.is_shard_aware, keepalive);
});
}
future<> controller::start_listening_on_maintenance_socket(sharded<cql_server>& cserver) {
auto socket = _config.maintenance_socket();
if (socket == "workdir") {
socket = _config.work_directory() + "/cql.m";
}
if (socket.length() > 107) {
throw std::runtime_error(format("Maintenance socket path is too long: {}. Change it to string shorter than 108 chars.", socket));
}
struct stat statbuf;
auto stat_result = ::stat(socket.c_str(), &statbuf);
if (stat_result == 0) {
// Check if it is a unix domain socket, not a regular file or directory
if (!S_ISSOCK(statbuf.st_mode)) {
throw std::runtime_error(format("Under maintenance socket path ({}) there is something else.", socket));
}
} else if (errno != ENOENT) {
// Other error than "file does not exist"
throw std::runtime_error(format("Failed to stat {}: {}", socket, strerror(errno)));
}
// Remove the socket if it already exists, otherwise when the server
// tries to listen on it, it will hang on bind().
auto unlink_result = ::unlink(socket.c_str());
if (unlink_result < 0 && errno != ENOENT) {
// Other error than "file does not exist"
throw std::runtime_error(format("Failed to unlink {}: {}", socket, strerror(errno)));
}
auto addr = socket_address { unix_domain_addr { socket } };
_listen_addresses.push_back(addr);
logger.info("Setting up maintenance socket on {}", socket);
return listen_on_all_shards(cserver, addr, nullptr, false, _config.rpc_keepalive());
}
future<> controller::do_start_server() {
if (_server) {
return make_ready_future<>();
@@ -74,10 +187,6 @@ future<> controller::do_start_server() {
auto cserver = std::make_unique<sharded<cql_server>>();
auto& cfg = _config;
auto preferred = cfg.rpc_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt;
auto family = cfg.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
auto ceo = cfg.client_encryption_options();
auto keepalive = cfg.rpc_keepalive();
smp_service_group_config cql_server_smp_service_group_config;
cql_server_smp_service_group_config.max_nonlocal_requests = 5000;
auto bounce_request_smp_service_group = create_smp_service_group(cql_server_smp_service_group_config).get();
@@ -110,92 +219,6 @@ future<> controller::do_start_server() {
std::shared_ptr<seastar::tls::credentials_builder> cred;
};
_listen_addresses.clear();
std::vector<listen_cfg> configs;
if (!_used_by_maintenance_socket) {
const seastar::net::inet_address ip = utils::resolve(cfg.rpc_address, family, preferred).get();
int native_port_idx = -1, native_shard_aware_port_idx = -1;
if (cfg.native_transport_port.is_set() ||
(!cfg.native_transport_port_ssl.is_set() && !cfg.native_transport_port.is_set())) {
// Non-SSL port is specified || neither SSL nor non-SSL ports are specified
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_transport_port()}, false });
_listen_addresses.push_back(configs.back().addr);
native_port_idx = 0;
}
if (cfg.native_shard_aware_transport_port.is_set() ||
(!cfg.native_shard_aware_transport_port_ssl.is_set() && !cfg.native_shard_aware_transport_port.is_set())) {
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true });
_listen_addresses.push_back(configs.back().addr);
native_shard_aware_port_idx = native_port_idx + 1;
}
// main should have made sure values are clean and neatish
if (utils::is_true(utils::get_or_default(ceo, "enabled", "false"))) {
auto cred = std::make_shared<seastar::tls::credentials_builder>();
utils::configure_tls_creds_builder(*cred, std::move(ceo)).get();
logger.info("Enabling encrypted CQL connections between client and server");
if (cfg.native_transport_port_ssl.is_set() &&
(!cfg.native_transport_port.is_set() ||
cfg.native_transport_port_ssl() != cfg.native_transport_port())) {
// SSL port is specified && non-SSL port is either left out or set to a different value
configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, false, cred});
_listen_addresses.push_back(configs.back().addr);
} else if (native_port_idx >= 0) {
configs[native_port_idx].cred = cred;
}
if (cfg.native_shard_aware_transport_port_ssl.is_set() &&
(!cfg.native_shard_aware_transport_port.is_set() ||
cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port())) {
configs.emplace_back(listen_cfg{{ip, cfg.native_shard_aware_transport_port_ssl()}, true, std::move(cred)});
_listen_addresses.push_back(configs.back().addr);
} else if (native_shard_aware_port_idx >= 0) {
configs[native_shard_aware_port_idx].cred = std::move(cred);
}
}
} else {
auto socket = cfg.maintenance_socket();
if (socket == "workdir") {
socket = cfg.work_directory() + "/cql.m";
}
if (socket.length() > 107) {
throw std::runtime_error(format("Maintenance socket path is too long: {}. Change it to string shorter than 108 chars.", socket));
}
struct stat statbuf;
auto stat_result = ::stat(socket.c_str(), &statbuf);
if (stat_result == 0) {
// Check if it is a unix domain socket, not a regular file or directory
if (!S_ISSOCK(statbuf.st_mode)) {
throw std::runtime_error(format("Under maintenance socket path ({}) there is something else.", socket));
}
} else if (errno != ENOENT) {
// Other error than "file does not exist"
throw std::runtime_error(format("Failed to stat {}: {}", socket, strerror(errno)));
}
// Remove the socket if it already exists, otherwise when the server
// tries to listen on it, it will hang on bind().
auto unlink_result = ::unlink(socket.c_str());
if (unlink_result < 0 && errno != ENOENT) {
// Other error than "file does not exist"
throw std::runtime_error(format("Failed to unlink {}: {}", socket, strerror(errno)));
}
configs.emplace_back(listen_cfg {
.addr = socket_address { unix_domain_addr { socket } },
.is_shard_aware = false
});
_listen_addresses.push_back(configs.back().addr);
logger.info("Setting up maintenance socket on {}", socket);
}
cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mem_limiter), std::move(get_cql_server_config), std::ref(cfg), std::ref(_sl_controller), std::ref(_gossiper), _cql_opcode_stats_key, _used_by_maintenance_socket).get();
auto on_error = defer([&cserver] { cserver->stop().get(); });
@@ -204,13 +227,12 @@ future<> controller::do_start_server() {
unsubscribe_server(*cserver).get();
});
parallel_for_each(configs, [&cserver, keepalive](const listen_cfg & cfg) {
return cserver->invoke_on_all(&cql_server::listen, cfg.addr, cfg.cred, cfg.is_shard_aware, keepalive).then([cfg] {
logger.info("Starting listening for CQL clients on {} ({}, {})"
, cfg.addr, cfg.cred ? "encrypted" : "unencrypted", cfg.is_shard_aware ? "shard-aware" : "non-shard-aware"
);
});
}).get();
_listen_addresses.clear();
if (!_used_by_maintenance_socket) {
start_listening_on_tcp_sockets(*cserver).get();
} else {
start_listening_on_maintenance_socket(*cserver).get();
}
set_cql_ready(true).get();

View File

@@ -59,6 +59,9 @@ class controller : public protocol_server {
future<> subscribe_server(sharded<cql_server>& server);
future<> unsubscribe_server(sharded<cql_server>& server);
future<> start_listening_on_tcp_sockets(sharded<cql_server>& cserver);
future<> start_listening_on_maintenance_socket(sharded<cql_server>& cserver);
maintenance_socket_enabled _used_by_maintenance_socket;
public: