transport: remove cql server load balancer

It is buggy, unused and unnecessary complicates the code.
This commit is contained in:
Gleb Natapov
2019-07-30 15:38:09 +03:00
parent 547c072f93
commit 7e3805ed3d
4 changed files with 11 additions and 53 deletions

View File

@@ -635,7 +635,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, api_address(this, "api_address", value_status::Used, "", "Http Rest API address")
, api_ui_dir(this, "api_ui_dir", value_status::Used, "swagger-ui/dist/", "The directory location of the API GUI")
, api_doc_dir(this, "api_doc_dir", value_status::Used, "api/api-doc/", "The API definition file directory")
, load_balance(this, "load_balance", value_status::Used, "none", "CQL request load balancing: 'none' or round-robin'")
, load_balance(this, "load_balance", value_status::Unused, "none", "CQL request load balancing: 'none' or round-robin'")
, consistent_rangemovement(this, "consistent_rangemovement", value_status::Used, true, "When set to true, range movements will be consistent. It means: 1) it will refuse to bootstrap a new node if other bootstrapping/leaving/moving nodes detected. 2) data will be streamed to a new node only from the node which is no longer responsible for the token range. Same as -Dcassandra.consistent.rangemovement in cassandra")
, join_ring(this, "join_ring", value_status::Used, true, "When set to true, a node will join the token ring. When set to false, a node will not join the token ring. User can use nodetool join to initiate ring joinging later. Same as -Dcassandra.join_ring in cassandra.")
, load_ring_state(this, "load_ring_state", value_status::Used, true, "When set to true, load tokens and host_ids previously saved. Same as -Dcassandra.load_ring_state in cassandra.")

View File

@@ -2264,9 +2264,8 @@ future<> storage_service::start_native_transport() {
cql_server_config.timeout_config = make_timeout_config(cfg);
cql_server_config.max_request_size = ss._db.local().get_available_memory() / 10;
cql_server_config.allow_shard_aware_drivers = cfg.enable_shard_aware_drivers();
cql_transport::cql_load_balance lb = cql_transport::parse_load_balance(cfg.load_balance());
return seastar::net::dns::resolve_name(addr).then([&ss, cserver, addr, &cfg, lb, keepalive, ceo = std::move(ceo), cql_server_config] (seastar::net::inet_address ip) {
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), lb, std::ref(ss._auth_service), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
return seastar::net::dns::resolve_name(addr).then([&ss, cserver, addr, &cfg, keepalive, ceo = std::move(ceo), cql_server_config] (seastar::net::inet_address ip) {
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
// #293 - do not stop anything
//engine().at_exit([cserver] {
// return cserver->stop();

View File

@@ -144,18 +144,7 @@ event::event_type parse_event_type(const sstring& value)
}
}
cql_load_balance parse_load_balance(sstring value)
{
if (value == "none") {
return cql_load_balance::none;
} else if (value == "round-robin") {
return cql_load_balance::round_robin;
} else {
throw std::invalid_argument("Unknown load balancing algorithm: " + value);
}
}
cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb, auth::service& auth_service,
cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, auth::service& auth_service,
cql_server_config config)
: _proxy(proxy)
, _query_processor(qp)
@@ -163,7 +152,6 @@ cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<c
, _max_request_size(config.max_request_size)
, _memory_available(_max_request_size)
, _notifier(std::make_unique<event_notifier>())
, _lb(lb)
, _auth_service(auth_service)
{
namespace sm = seastar::metrics;
@@ -607,29 +595,16 @@ future<> cql_server::connection::process_request() {
// Replacing the immediately-invoked lambda below with just its body costs 5-10 usec extra per invocation.
// Cause not understood.
auto istream = buf.get_istream();
[&] {
auto cpu = pick_request_cpu();
return [&] {
if (cpu == engine().cpu_id()) {
return _process_request_stage(this, istream, op, stream, service::client_state(service::client_state::request_copy_tag{}, _client_state, _client_state.get_timestamp()), tracing_requested);
} else {
// We should avoid sending non-trivial objects across shards.
static_assert(std::is_trivially_destructible_v<fragmented_temporary_buffer::istream>);
static_assert(std::is_trivially_copyable_v<fragmented_temporary_buffer::istream>);
return smp::submit_to(cpu, [this, istream, op, stream, client_state = _client_state, tracing_requested, ts = _client_state.get_timestamp()] () mutable {
return _process_request_stage(this, istream, op, stream, service::client_state(service::client_state::request_copy_tag{}, client_state, ts), tracing_requested);
});
}
}().then_wrapped([this, buf = std::move(buf), mem_permit = std::move(mem_permit), leave = std::move(leave)] (future<processing_result> response_f) {
try {
(void)_process_request_stage(this, istream, op, stream, service::client_state(service::client_state::request_copy_tag{}, _client_state, _client_state.get_timestamp()), tracing_requested)
.then_wrapped([this, buf = std::move(buf), mem_permit = std::move(mem_permit), leave = std::move(leave)] (future<processing_result> response_f) {
try {
auto response = response_f.get0();
update_client_state(response);
write_response(std::move(response.cql_response), _compression);
} catch (...) {
} catch (...) {
clogger.error("request processing failed: {}", std::current_exception());
}
});
}();
}
});
return make_ready_future<>();
});
@@ -713,14 +688,6 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
return _buffer_reader.read_exactly(_read_buf, length);
}
unsigned cql_server::connection::pick_request_cpu()
{
if (_server._lb == cql_load_balance::round_robin) {
return _request_cpu++ % smp::count;
}
return engine().cpu_id();
}
future<response_type> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state client_state)
{
auto options = in.read_string_map();

View File

@@ -89,13 +89,6 @@ struct [[gnu::packed]] cql_binary_frame_v3 {
}
};
enum class cql_load_balance {
none,
round_robin,
};
cql_load_balance parse_load_balance(sstring value);
struct cql_query_state {
service::query_state query_state;
std::unique_ptr<cql3::query_options> options;
@@ -131,10 +124,9 @@ private:
uint64_t _requests_served = 0;
uint64_t _requests_serving = 0;
uint64_t _requests_blocked_memory = 0;
cql_load_balance _lb;
auth::service& _auth_service;
public:
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb, auth::service&,
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, auth::service&,
cql_server_config config);
future<> listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
future<> do_accepts(int which, bool keepalive, socket_address server_addr);