transport: cql_server::connection: paralellization of cql requests handling

Take the request processing part into the separate function and
run it in parallel for incoming requests.

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
This commit is contained in:
Vlad Zolotarov
2015-08-04 12:58:01 +03:00
committed by Avi Kivity
parent 8bc863bd14
commit 707f3b25e3

View File

@@ -135,6 +135,7 @@ class cql_server::connection {
connected_socket _fd;
input_stream<char> _read_buf;
output_stream<char> _write_buf;
seastar::gate _pending_requests_gate;
future<> _ready_to_respond = make_ready_future<>();
uint8_t _version = 0;
serialization_format _serialization_format = serialization_format::use_16_bit();
@@ -150,10 +151,18 @@ public:
{ }
future<> process() {
return do_until([this] { return _read_buf.eof(); }, [this] { return process_request(); })
.finally([this] { return std::move(_ready_to_respond); });
.finally([this] {
return _pending_requests_gate.close().then([this] {
std::move(_ready_to_respond);
});
});
}
future<> process_request();
private:
future<> process_request_one(temporary_buffer<char> buf,
uint8_t op,
uint16_t stream);
unsigned frame_size() const;
cql_binary_frame_v3 parse_frame(temporary_buffer<char> buf);
future<std::experimental::optional<cql_binary_frame_v3>> read_frame();
@@ -383,6 +392,44 @@ cql_server::connection::read_frame() {
}
}
future<> cql_server::connection::process_request_one(temporary_buffer<char> buf,
uint8_t op,
uint16_t stream) {
return make_ready_future<>().then([this, op, stream, buf = std::move(buf)] () mutable {
switch (static_cast<cql_binary_opcode>(op)) {
case cql_binary_opcode::STARTUP: return process_startup(stream, std::move(buf));
case cql_binary_opcode::AUTH_RESPONSE: return process_auth_response(stream, std::move(buf));
case cql_binary_opcode::OPTIONS: return process_options(stream, std::move(buf));
case cql_binary_opcode::QUERY: return process_query(stream, std::move(buf));
case cql_binary_opcode::PREPARE: return process_prepare(stream, std::move(buf));
case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(buf));
case cql_binary_opcode::BATCH: return process_batch(stream, std::move(buf));
case cql_binary_opcode::REGISTER: return process_register(stream, std::move(buf));
default: throw exceptions::protocol_exception(sprint("Unknown opcode %d", op));
}
}).then_wrapped([stream, this] (future<> f) {
--_server._requests_serving;
try {
f.get();
return make_ready_future<>();
} catch (const exceptions::unavailable_exception& ex) {
return write_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive);
} catch (const exceptions::read_timeout_exception& ex) {
return write_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present);
} catch (const exceptions::already_exists_exception& ex) {
return write_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name);
} catch (const exceptions::prepared_query_not_found_exception& ex) {
return write_unprepared_error(stream, ex.code(), ex.what(), ex.id);
} catch (const exceptions::cassandra_exception& ex) {
return write_error(stream, ex.code(), ex.what());
} catch (std::exception& ex) {
return write_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what());
} catch (...) {
return write_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error");
}
});
}
future<> cql_server::connection::process_request() {
return read_frame().then_wrapped([this] (future<std::experimental::optional<cql_binary_frame_v3>>&& v) {
auto maybe_frame = std::get<0>(v.get());
@@ -390,44 +437,30 @@ future<> cql_server::connection::process_request() {
// eof
return make_ready_future<>();
}
auto& f = *maybe_frame;
return _read_buf.read_exactly(f.length).then([this, f] (temporary_buffer<char> buf) {
// FIXME: compression
if (f.flags & 0x01) {
throw std::runtime_error("CQL frame compression is not supported");
}
// FIXME: compression
if (f.flags & 0x01) {
throw std::runtime_error("CQL frame compression is not supported");
}
auto op = f.opcode;
auto stream = f.stream;
return _read_buf.read_exactly(f.length).then([this, op, stream] (temporary_buffer<char> buf) {
++_server._requests_served;
++_server._requests_serving;
switch (static_cast<cql_binary_opcode>(f.opcode)) {
case cql_binary_opcode::STARTUP: return process_startup(f.stream, std::move(buf));
case cql_binary_opcode::AUTH_RESPONSE: return process_auth_response(f.stream, std::move(buf));
case cql_binary_opcode::OPTIONS: return process_options(f.stream, std::move(buf));
case cql_binary_opcode::QUERY: return process_query(f.stream, std::move(buf));
case cql_binary_opcode::PREPARE: return process_prepare(f.stream, std::move(buf));
case cql_binary_opcode::EXECUTE: return process_execute(f.stream, std::move(buf));
case cql_binary_opcode::BATCH: return process_batch(f.stream, std::move(buf));
case cql_binary_opcode::REGISTER: return process_register(f.stream, std::move(buf));
default: throw exceptions::protocol_exception(sprint("Unknown opcode %d", f.opcode));
};
}).then_wrapped([stream = f.stream, this] (future<> f) {
--_server._requests_serving;
try {
f.get();
} catch (const exceptions::unavailable_exception& ex) {
write_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive);
} catch (const exceptions::read_timeout_exception& ex) {
write_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present);
} catch (const exceptions::already_exists_exception& ex) {
write_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name);
} catch (const exceptions::prepared_query_not_found_exception& ex) {
write_unprepared_error(stream, ex.code(), ex.what(), ex.id);
} catch (const exceptions::cassandra_exception& ex) {
write_error(stream, ex.code(), ex.what());
} catch (std::exception& ex) {
write_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what());
} catch (...) {
write_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error");
}
with_gate(
_pending_requests_gate,
[this, op, stream, buf = std::move(buf)] () mutable {
return process_request_one(std::move(buf), op, stream);
}
);
return make_ready_future<>();
});
});
}