transport: remove cql server load balancer
It is buggy, unused and unnecessary complicates the code.
This commit is contained in:
@@ -635,7 +635,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, api_address(this, "api_address", value_status::Used, "", "Http Rest API address")
|
||||
, api_ui_dir(this, "api_ui_dir", value_status::Used, "swagger-ui/dist/", "The directory location of the API GUI")
|
||||
, api_doc_dir(this, "api_doc_dir", value_status::Used, "api/api-doc/", "The API definition file directory")
|
||||
, load_balance(this, "load_balance", value_status::Used, "none", "CQL request load balancing: 'none' or round-robin'")
|
||||
, load_balance(this, "load_balance", value_status::Unused, "none", "CQL request load balancing: 'none' or round-robin'")
|
||||
, consistent_rangemovement(this, "consistent_rangemovement", value_status::Used, true, "When set to true, range movements will be consistent. It means: 1) it will refuse to bootstrap a new node if other bootstrapping/leaving/moving nodes detected. 2) data will be streamed to a new node only from the node which is no longer responsible for the token range. Same as -Dcassandra.consistent.rangemovement in cassandra")
|
||||
, join_ring(this, "join_ring", value_status::Used, true, "When set to true, a node will join the token ring. When set to false, a node will not join the token ring. User can use nodetool join to initiate ring joinging later. Same as -Dcassandra.join_ring in cassandra.")
|
||||
, load_ring_state(this, "load_ring_state", value_status::Used, true, "When set to true, load tokens and host_ids previously saved. Same as -Dcassandra.load_ring_state in cassandra.")
|
||||
|
||||
@@ -2264,9 +2264,8 @@ future<> storage_service::start_native_transport() {
|
||||
cql_server_config.timeout_config = make_timeout_config(cfg);
|
||||
cql_server_config.max_request_size = ss._db.local().get_available_memory() / 10;
|
||||
cql_server_config.allow_shard_aware_drivers = cfg.enable_shard_aware_drivers();
|
||||
cql_transport::cql_load_balance lb = cql_transport::parse_load_balance(cfg.load_balance());
|
||||
return seastar::net::dns::resolve_name(addr).then([&ss, cserver, addr, &cfg, lb, keepalive, ceo = std::move(ceo), cql_server_config] (seastar::net::inet_address ip) {
|
||||
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), lb, std::ref(ss._auth_service), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
|
||||
return seastar::net::dns::resolve_name(addr).then([&ss, cserver, addr, &cfg, keepalive, ceo = std::move(ceo), cql_server_config] (seastar::net::inet_address ip) {
|
||||
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([cserver] {
|
||||
// return cserver->stop();
|
||||
|
||||
@@ -144,18 +144,7 @@ event::event_type parse_event_type(const sstring& value)
|
||||
}
|
||||
}
|
||||
|
||||
cql_load_balance parse_load_balance(sstring value)
|
||||
{
|
||||
if (value == "none") {
|
||||
return cql_load_balance::none;
|
||||
} else if (value == "round-robin") {
|
||||
return cql_load_balance::round_robin;
|
||||
} else {
|
||||
throw std::invalid_argument("Unknown load balancing algorithm: " + value);
|
||||
}
|
||||
}
|
||||
|
||||
cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb, auth::service& auth_service,
|
||||
cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, auth::service& auth_service,
|
||||
cql_server_config config)
|
||||
: _proxy(proxy)
|
||||
, _query_processor(qp)
|
||||
@@ -163,7 +152,6 @@ cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<c
|
||||
, _max_request_size(config.max_request_size)
|
||||
, _memory_available(_max_request_size)
|
||||
, _notifier(std::make_unique<event_notifier>())
|
||||
, _lb(lb)
|
||||
, _auth_service(auth_service)
|
||||
{
|
||||
namespace sm = seastar::metrics;
|
||||
@@ -607,29 +595,16 @@ future<> cql_server::connection::process_request() {
|
||||
// Replacing the immediately-invoked lambda below with just its body costs 5-10 usec extra per invocation.
|
||||
// Cause not understood.
|
||||
auto istream = buf.get_istream();
|
||||
[&] {
|
||||
auto cpu = pick_request_cpu();
|
||||
return [&] {
|
||||
if (cpu == engine().cpu_id()) {
|
||||
return _process_request_stage(this, istream, op, stream, service::client_state(service::client_state::request_copy_tag{}, _client_state, _client_state.get_timestamp()), tracing_requested);
|
||||
} else {
|
||||
// We should avoid sending non-trivial objects across shards.
|
||||
static_assert(std::is_trivially_destructible_v<fragmented_temporary_buffer::istream>);
|
||||
static_assert(std::is_trivially_copyable_v<fragmented_temporary_buffer::istream>);
|
||||
return smp::submit_to(cpu, [this, istream, op, stream, client_state = _client_state, tracing_requested, ts = _client_state.get_timestamp()] () mutable {
|
||||
return _process_request_stage(this, istream, op, stream, service::client_state(service::client_state::request_copy_tag{}, client_state, ts), tracing_requested);
|
||||
});
|
||||
}
|
||||
}().then_wrapped([this, buf = std::move(buf), mem_permit = std::move(mem_permit), leave = std::move(leave)] (future<processing_result> response_f) {
|
||||
try {
|
||||
(void)_process_request_stage(this, istream, op, stream, service::client_state(service::client_state::request_copy_tag{}, _client_state, _client_state.get_timestamp()), tracing_requested)
|
||||
.then_wrapped([this, buf = std::move(buf), mem_permit = std::move(mem_permit), leave = std::move(leave)] (future<processing_result> response_f) {
|
||||
try {
|
||||
auto response = response_f.get0();
|
||||
update_client_state(response);
|
||||
write_response(std::move(response.cql_response), _compression);
|
||||
} catch (...) {
|
||||
} catch (...) {
|
||||
clogger.error("request processing failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
}();
|
||||
}
|
||||
});
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -713,14 +688,6 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
|
||||
return _buffer_reader.read_exactly(_read_buf, length);
|
||||
}
|
||||
|
||||
unsigned cql_server::connection::pick_request_cpu()
|
||||
{
|
||||
if (_server._lb == cql_load_balance::round_robin) {
|
||||
return _request_cpu++ % smp::count;
|
||||
}
|
||||
return engine().cpu_id();
|
||||
}
|
||||
|
||||
future<response_type> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state client_state)
|
||||
{
|
||||
auto options = in.read_string_map();
|
||||
|
||||
@@ -89,13 +89,6 @@ struct [[gnu::packed]] cql_binary_frame_v3 {
|
||||
}
|
||||
};
|
||||
|
||||
enum class cql_load_balance {
|
||||
none,
|
||||
round_robin,
|
||||
};
|
||||
|
||||
cql_load_balance parse_load_balance(sstring value);
|
||||
|
||||
struct cql_query_state {
|
||||
service::query_state query_state;
|
||||
std::unique_ptr<cql3::query_options> options;
|
||||
@@ -131,10 +124,9 @@ private:
|
||||
uint64_t _requests_served = 0;
|
||||
uint64_t _requests_serving = 0;
|
||||
uint64_t _requests_blocked_memory = 0;
|
||||
cql_load_balance _lb;
|
||||
auth::service& _auth_service;
|
||||
public:
|
||||
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb, auth::service&,
|
||||
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, auth::service&,
|
||||
cql_server_config config);
|
||||
future<> listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
|
||||
future<> do_accepts(int which, bool keepalive, socket_address server_addr);
|
||||
|
||||
Reference in New Issue
Block a user