Merge "Make front-end processing scheduling aware" from Avi

"
This patchset runs the protocol servers under the "statement" scheduling
group, and makes all execution_stages in that path scheduling aware.

I used inheriting_concrete_execution_stage instead of passing the
scheduling group to concrete_execution_stage's constructor for two
reasons:

 1. For cql statements, there is no easily accessible object that
    can host the concrete_execution_stage and be reached from both
    main.cc and the statements,
 2. In the future, we will want to assign users to different
    scheduling_groups, thus providing performance isolation for
    service-level agreements (SLAs). Using an inheriting
    execution_stage allows us to make the scheduling_group decision
    in one place.

Depends on two unmerged patches in seastar, one fixing
inheriting_concrete_execution_stage compilation with reference parameters,
and one making smp::submit_to() scheduling aware.
"

* tag 'cql-sched/v1' of https://github.com/avikivity/scylla:
  cql: make modification_statement execution_stage scheduling aware
  cql: make batch_statement execution_stage scheduling aware
  cql: make select_statement execution_stage scheduling aware
  transport: make native protocol request processing execution_stage scheduling aware
  main: start client protocol servers under the statement scheduling group
This commit is contained in:
Paweł Dziepak
2018-06-18 16:38:30 +01:00
6 changed files with 41 additions and 12 deletions

View File

@@ -283,7 +283,14 @@ void batch_statement::verify_batch_size(const std::vector<mutation>& mutations)
struct batch_statement_executor {
static auto get() { return &batch_statement::do_execute; }
};
static thread_local auto batch_stage = seastar::make_execution_stage("cql3_batch", batch_statement_executor::get());
static thread_local inheriting_concrete_execution_stage<
future<shared_ptr<cql_transport::messages::result_message>>,
batch_statement*,
service::storage_proxy&,
service::query_state&,
const query_options&,
bool,
api::timestamp_type> batch_stage{"cql3_batch", batch_statement_executor::get()};
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute(
service::storage_proxy& storage, service::query_state& state, const query_options& options) {

View File

@@ -365,7 +365,12 @@ modification_statement::build_partition_keys(const query_options& options, const
struct modification_statement_executor {
static auto get() { return &modification_statement::do_execute; }
};
static thread_local auto modify_stage = seastar::make_execution_stage("cql3_modification", modification_statement_executor::get());
static thread_local inheriting_concrete_execution_stage<
future<::shared_ptr<cql_transport::messages::result_message>>,
modification_statement*,
service::storage_proxy&,
service::query_state&,
const query_options&> modify_stage{"cql3_modification", modification_statement_executor::get()};
future<::shared_ptr<cql_transport::messages::result_message>>
modification_statement::execute(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) {

View File

@@ -353,7 +353,13 @@ bool select_statement::needs_post_query_ordering() const {
struct select_statement_executor {
static auto get() { return &select_statement::do_execute; }
};
static thread_local auto select_stage = seastar::make_execution_stage("cql3_select", select_statement_executor::get());
static thread_local inheriting_concrete_execution_stage<
future<shared_ptr<cql_transport::messages::result_message>>,
select_statement*,
service::storage_proxy&,
service::query_state&,
const query_options&> select_stage{"cql3_select", select_statement_executor::get()};
future<shared_ptr<cql_transport::messages::result_message>>
select_statement::execute(service::storage_proxy& proxy,

View File

@@ -730,9 +730,13 @@ int main(int ac, char** av) {
}
supervisor::notify("starting native transport");
service::get_local_storage_service().start_native_transport().get();
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
return service::get_local_storage_service().start_native_transport();
}).get();
if (start_thrift) {
service::get_local_storage_service().start_rpc_server().get();
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
return service::get_local_storage_service().start_rpc_server();
}).get();
}
if (cfg->defragment_memory_on_idle()) {
smp::invoke_on_all([] () {

View File

@@ -628,10 +628,8 @@ future<> cql_server::connection::shutdown()
return make_ready_future<>();
}
struct process_request_executor {
static auto get() { return &cql_server::connection::process_request_one; }
};
static thread_local auto process_request_stage = seastar::make_execution_stage("transport", process_request_executor::get());
thread_local cql_server::connection::execution_stage_type
cql_server::connection::_process_request_stage{"transport", &connection::process_request_one};
void cql_server::connection::update_client_state(processing_result& response) {
if (response.keyspace) {
@@ -707,10 +705,10 @@ future<> cql_server::connection::process_request() {
auto cpu = pick_request_cpu();
return [&] {
if (cpu == engine().cpu_id()) {
return process_request_stage(this, bv, op, stream, service::client_state(service::client_state::request_copy_tag{}, _client_state, _client_state.get_timestamp()), tracing_requested);
return _process_request_stage(this, bv, op, stream, service::client_state(service::client_state::request_copy_tag{}, _client_state, _client_state.get_timestamp()), tracing_requested);
} else {
return smp::submit_to(cpu, [this, bv = std::move(bv), op, stream, client_state = _client_state, tracing_requested, ts = _client_state.get_timestamp()] () mutable {
return process_request_stage(this, bv, op, stream, service::client_state(service::client_state::request_copy_tag{}, client_state, ts), tracing_requested);
return _process_request_stage(this, bv, op, stream, service::client_state(service::client_state::request_copy_tag{}, client_state, ts), tracing_requested);
});
}
}().then_wrapped([this, buf = std::move(buf), mem_permit = std::move(mem_permit), leave = std::move(leave)] (future<processing_result> response_f) {

View File

@@ -140,7 +140,6 @@ public:
private:
class fmt_visitor;
friend class connection;
friend class process_request_executor;
class connection : public boost::intrusive::list_base_hook<> {
struct processing_result {
foreign_ptr<shared_ptr<cql_server::response>> cql_response;
@@ -175,6 +174,16 @@ private:
no_write_on_close,
write_on_close
};
private:
using execution_stage_type = inheriting_concrete_execution_stage<
future<cql_server::connection::processing_result>,
cql_server::connection*,
bytes_view,
uint8_t,
uint16_t,
service::client_state,
tracing_request_type>;
static thread_local execution_stage_type _process_request_stage;
public:
connection(cql_server& server, ipv4_addr server_addr, connected_socket&& fd, socket_address addr);
~connection();