Merge "Make front-end processing scheduling aware" from Avi
"
This patchset runs the protocol servers under the "statement" scheduling
group, and makes all execution_stages in that path scheduling aware.
I used inheriting_concrete_execution_stage instead of passing the
scheduling group to concrete_execution_stage's constructor for two
reasons:
1. For cql statements, there is no easily accessible object that
can host the concrete_execution_stage and be reached from both
main.cc and the statements,
2. In the future, we will want to assign users to different
scheduling_groups, thus providing performance isolation for
service-level agreements (SLAs). Using an inheriting
execution_stage allows us to make the scheduling_group decision
in one place.
Depends on two unmerged patches in seastar, one fixing
inheriting_concrete_execution_stage compilation with reference parameters,
and one making smp::submit_to() scheduling aware.
"
* tag 'cql-sched/v1' of https://github.com/avikivity/scylla:
cql: make modification_statement execution_stage scheduling aware
cql: make batch_statement execution_stage scheduling aware
cql: make select_statement execution_stage scheduling aware
transport: make native protocol request processing execution_stage scheduling aware
main: start client protocol servers under the statement scheduling group
This commit is contained in:
@@ -283,7 +283,14 @@ void batch_statement::verify_batch_size(const std::vector<mutation>& mutations)
|
||||
struct batch_statement_executor {
|
||||
static auto get() { return &batch_statement::do_execute; }
|
||||
};
|
||||
static thread_local auto batch_stage = seastar::make_execution_stage("cql3_batch", batch_statement_executor::get());
|
||||
static thread_local inheriting_concrete_execution_stage<
|
||||
future<shared_ptr<cql_transport::messages::result_message>>,
|
||||
batch_statement*,
|
||||
service::storage_proxy&,
|
||||
service::query_state&,
|
||||
const query_options&,
|
||||
bool,
|
||||
api::timestamp_type> batch_stage{"cql3_batch", batch_statement_executor::get()};
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute(
|
||||
service::storage_proxy& storage, service::query_state& state, const query_options& options) {
|
||||
|
||||
@@ -365,7 +365,12 @@ modification_statement::build_partition_keys(const query_options& options, const
|
||||
struct modification_statement_executor {
|
||||
static auto get() { return &modification_statement::do_execute; }
|
||||
};
|
||||
static thread_local auto modify_stage = seastar::make_execution_stage("cql3_modification", modification_statement_executor::get());
|
||||
static thread_local inheriting_concrete_execution_stage<
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>,
|
||||
modification_statement*,
|
||||
service::storage_proxy&,
|
||||
service::query_state&,
|
||||
const query_options&> modify_stage{"cql3_modification", modification_statement_executor::get()};
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
modification_statement::execute(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) {
|
||||
|
||||
@@ -353,7 +353,13 @@ bool select_statement::needs_post_query_ordering() const {
|
||||
struct select_statement_executor {
|
||||
static auto get() { return &select_statement::do_execute; }
|
||||
};
|
||||
static thread_local auto select_stage = seastar::make_execution_stage("cql3_select", select_statement_executor::get());
|
||||
|
||||
static thread_local inheriting_concrete_execution_stage<
|
||||
future<shared_ptr<cql_transport::messages::result_message>>,
|
||||
select_statement*,
|
||||
service::storage_proxy&,
|
||||
service::query_state&,
|
||||
const query_options&> select_stage{"cql3_select", select_statement_executor::get()};
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
select_statement::execute(service::storage_proxy& proxy,
|
||||
|
||||
8
main.cc
8
main.cc
@@ -730,9 +730,13 @@ int main(int ac, char** av) {
|
||||
}
|
||||
|
||||
supervisor::notify("starting native transport");
|
||||
service::get_local_storage_service().start_native_transport().get();
|
||||
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
|
||||
return service::get_local_storage_service().start_native_transport();
|
||||
}).get();
|
||||
if (start_thrift) {
|
||||
service::get_local_storage_service().start_rpc_server().get();
|
||||
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
|
||||
return service::get_local_storage_service().start_rpc_server();
|
||||
}).get();
|
||||
}
|
||||
if (cfg->defragment_memory_on_idle()) {
|
||||
smp::invoke_on_all([] () {
|
||||
|
||||
@@ -628,10 +628,8 @@ future<> cql_server::connection::shutdown()
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
struct process_request_executor {
|
||||
static auto get() { return &cql_server::connection::process_request_one; }
|
||||
};
|
||||
static thread_local auto process_request_stage = seastar::make_execution_stage("transport", process_request_executor::get());
|
||||
thread_local cql_server::connection::execution_stage_type
|
||||
cql_server::connection::_process_request_stage{"transport", &connection::process_request_one};
|
||||
|
||||
void cql_server::connection::update_client_state(processing_result& response) {
|
||||
if (response.keyspace) {
|
||||
@@ -707,10 +705,10 @@ future<> cql_server::connection::process_request() {
|
||||
auto cpu = pick_request_cpu();
|
||||
return [&] {
|
||||
if (cpu == engine().cpu_id()) {
|
||||
return process_request_stage(this, bv, op, stream, service::client_state(service::client_state::request_copy_tag{}, _client_state, _client_state.get_timestamp()), tracing_requested);
|
||||
return _process_request_stage(this, bv, op, stream, service::client_state(service::client_state::request_copy_tag{}, _client_state, _client_state.get_timestamp()), tracing_requested);
|
||||
} else {
|
||||
return smp::submit_to(cpu, [this, bv = std::move(bv), op, stream, client_state = _client_state, tracing_requested, ts = _client_state.get_timestamp()] () mutable {
|
||||
return process_request_stage(this, bv, op, stream, service::client_state(service::client_state::request_copy_tag{}, client_state, ts), tracing_requested);
|
||||
return _process_request_stage(this, bv, op, stream, service::client_state(service::client_state::request_copy_tag{}, client_state, ts), tracing_requested);
|
||||
});
|
||||
}
|
||||
}().then_wrapped([this, buf = std::move(buf), mem_permit = std::move(mem_permit), leave = std::move(leave)] (future<processing_result> response_f) {
|
||||
|
||||
@@ -140,7 +140,6 @@ public:
|
||||
private:
|
||||
class fmt_visitor;
|
||||
friend class connection;
|
||||
friend class process_request_executor;
|
||||
class connection : public boost::intrusive::list_base_hook<> {
|
||||
struct processing_result {
|
||||
foreign_ptr<shared_ptr<cql_server::response>> cql_response;
|
||||
@@ -175,6 +174,16 @@ private:
|
||||
no_write_on_close,
|
||||
write_on_close
|
||||
};
|
||||
private:
|
||||
using execution_stage_type = inheriting_concrete_execution_stage<
|
||||
future<cql_server::connection::processing_result>,
|
||||
cql_server::connection*,
|
||||
bytes_view,
|
||||
uint8_t,
|
||||
uint16_t,
|
||||
service::client_state,
|
||||
tracing_request_type>;
|
||||
static thread_local execution_stage_type _process_request_stage;
|
||||
public:
|
||||
connection(cql_server& server, ipv4_addr server_addr, connected_socket&& fd, socket_address addr);
|
||||
~connection();
|
||||
|
||||
Reference in New Issue
Block a user