transport/server: Remove _query_states from connection

Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
This commit is contained in:
Pekka Enberg
2015-09-15 10:33:12 +03:00
committed by Pekka Enberg
parent 3884f1c8b6
commit 3b6eba1344
2 changed files with 19 additions and 21 deletions

View File

@@ -477,15 +477,6 @@ future<> cql_server::connection::process_options(uint16_t stream, temporary_buff
return write_supported(stream);
}
cql_query_state& cql_server::connection::get_query_state(uint16_t stream)
{
auto i = _query_states.find(stream);
if (i == _query_states.end()) {
i = _query_states.emplace(stream, _client_state).first;
}
return i->second;
}
void
cql_server::connection::init_serialization_format() {
if (_version < 3) {
@@ -498,10 +489,14 @@ cql_server::connection::init_serialization_format() {
future<> cql_server::connection::process_query(uint16_t stream, temporary_buffer<char> buf)
{
auto query = read_long_string_view(buf);
auto& q_state = get_query_state(stream);
q_state.options = read_options(buf);
return _server._query_processor.local().process(query, q_state.query_state, *q_state.options).then([this, stream, buf = std::move(buf)] (auto msg) {
auto q_state = std::make_unique<cql_query_state>(_client_state);
auto& query_state = q_state->query_state;
q_state->options = read_options(buf);
auto& options = *q_state->options;
return _server._query_processor.local().process(query, query_state, options).then([this, stream, buf = std::move(buf)] (auto msg) {
return this->write_result(stream, msg);
}).then([q_state = std::move(q_state)] {
/* Keep q_state alive. */
});
}
@@ -535,10 +530,10 @@ future<> cql_server::connection::process_execute(uint16_t stream, temporary_buff
if (!prepared) {
throw exceptions::prepared_query_not_found_exception(id);
}
auto& q_state = get_query_state(stream);
auto& query_state = q_state.query_state;
q_state.options = read_options(buf);
auto& options = *q_state.options;
auto q_state = std::make_unique<cql_query_state>(_client_state);
auto& query_state = q_state->query_state;
q_state->options = read_options(buf);
auto& options = *q_state->options;
options.prepare(prepared->bound_names);
auto stmt = prepared->statement;
if (stmt->get_bound_terms() != options.get_values_count()) {
@@ -546,6 +541,8 @@ future<> cql_server::connection::process_execute(uint16_t stream, temporary_buff
}
return _server._query_processor.local().process_statement(stmt, query_state, options).then([this, stream, buf = std::move(buf)] (auto msg) {
return this->write_result(stream, msg);
}).then([q_state = std::move(q_state)] {
/* Keep q_state alive. */
});
}
@@ -607,14 +604,16 @@ future<> cql_server::connection::process_batch(uint16_t stream, temporary_buffer
values.emplace_back(std::move(tmp));
}
auto& q_state = get_query_state(stream);
auto& query_state = q_state.query_state;
q_state.options = std::make_unique<cql3::query_options>(std::move(*read_options(buf)), std::move(values));
auto& options = *q_state.options;
auto q_state = std::make_unique<cql_query_state>(_client_state);
auto& query_state = q_state->query_state;
q_state->options = std::make_unique<cql3::query_options>(std::move(*read_options(buf)), std::move(values));
auto& options = *q_state->options;
auto batch = ::make_shared<cql3::statements::batch_statement>(-1, cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none());
return _server._query_processor.local().process_batch(batch, query_state, options).then([this, stream, batch] (auto msg) {
return this->write_result(stream, msg);
}).then([q_state = std::move(q_state)] {
/* Keep q_state alive. */
});
}

View File

@@ -206,7 +206,6 @@ private:
std::unordered_map<sstring, sstring> read_string_map(temporary_buffer<char>& buf);
std::unique_ptr<cql3::query_options> read_options(temporary_buffer<char>& buf);
cql_query_state& get_query_state(uint16_t stream);
void init_serialization_format();
friend event_notifier;