Merge "server::process_batch" from Calle

"Fixes #332

Implementation of "native protocol message of type BATCH", i.e. Origin
BatchMessage."
This commit is contained in:
Avi Kivity
2015-09-16 15:38:20 +03:00
6 changed files with 117 additions and 21 deletions

View File

@@ -71,6 +71,17 @@ query_options::query_options(db::consistency_level consistency,
{
}
query_options::query_options(query_options&& o, std::vector<std::vector<bytes_view_opt>> value_views)
: query_options(std::move(o))
{
std::vector<query_options> tmp;
tmp.reserve(value_views.size());
std::transform(value_views.begin(), value_views.end(), std::back_inserter(tmp), [this](auto& vals) {
return query_options(_consistency, {}, vals, _skip_metadata, _options, _protocol_version, _serialization_format);
});
_batch_options = std::move(tmp);
}
query_options::query_options(std::vector<bytes_opt> values)
: query_options(
db::consistency_level::ONE,

View File

@@ -61,6 +61,9 @@ private:
serialization_format _serialization_format;
std::experimental::optional<std::vector<query_options>> _batch_options;
public:
query_options(query_options&&) = default;
query_options(const query_options&) = delete;
explicit query_options(db::consistency_level consistency,
std::experimental::optional<std::vector<sstring_view>> names,
std::vector<bytes_opt> values,
@@ -77,6 +80,16 @@ public:
int32_t protocol_version,
serialization_format sf);
explicit query_options(db::consistency_level consistency,
std::vector<std::vector<bytes_view_opt>> value_views,
bool skip_metadata,
specific_options options,
int32_t protocol_version,
serialization_format sf);
// Batch query_options constructor
explicit query_options(query_options&&, std::vector<std::vector<bytes_view_opt>> value_views);
// It can't be const because of prepare()
static thread_local query_options DEFAULT;

View File

@@ -25,6 +25,7 @@
#include "cql3/query_processor.hh"
#include "cql3/CqlParser.hpp"
#include "cql3/error_collector.hh"
#include "cql3/statements/batch_statement.hh"
#include "transport/messages/result_message.hh"
@@ -311,4 +312,14 @@ future<::shared_ptr<untyped_result_set>> query_processor::execute_internal(
});
}
future<::shared_ptr<transport::messages::result_message>>
query_processor::process_batch(::shared_ptr<statements::batch_statement> batch, service::query_state& query_state, query_options& options) {
auto& client_state = query_state.get_client_state();
batch->check_access(client_state);
batch->validate();
batch->validate(_proxy, client_state);
return batch->execute(_proxy, query_state, options);
}
}

View File

@@ -39,6 +39,10 @@
namespace cql3 {
namespace statements {
class batch_statement;
}
class query_processor {
private:
distributed<service::storage_proxy>& _proxy;
@@ -425,19 +429,12 @@ private:
metrics.preparedStatementsExecuted.inc();
return processStatement(statement, queryState, options);
}
public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions options)
throws RequestExecutionException, RequestValidationException
{
ClientState clientState = queryState.getClientState();
batch.checkAccess(clientState);
batch.validate();
batch.validate(clientState);
return batch.execute(queryState, options);
}
#endif
public:
future<::shared_ptr<transport::messages::result_message>> process_batch(::shared_ptr<statements::batch_statement>,
service::query_state& query_state, query_options& options);
::shared_ptr<statements::parsed_statement::prepared> get_statement(const std::experimental::string_view& query,
const service::client_state& client_state);
static ::shared_ptr<statements::parsed_statement> parse_statement(const std::experimental::string_view& query);

View File

@@ -214,24 +214,24 @@ future<> stream_session::test(distributed<cql3::query_processor>& qp) {
sslog.debug("================ STREAM_PLAN TEST ==============");
auto cs = service::client_state::for_external_calls();
service::query_state qs(cs);
auto opts = make_shared<cql3::query_options>(cql3::query_options::DEFAULT);
qp.local().process("CREATE KEYSPACE ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };", qs, *opts).get();
auto& opts = cql3::query_options::DEFAULT;
qp.local().process("CREATE KEYSPACE ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };", qs, opts).get();
sslog.debug("CREATE KEYSPACE = KS DONE");
sleep(std::chrono::seconds(3)).get();
qp.local().process("CREATE TABLE ks.tb ( key text PRIMARY KEY, C0 text, C1 text, C2 text, C3 blob, C4 text);", qs, *opts).get();
qp.local().process("CREATE TABLE ks.tb ( key text PRIMARY KEY, C0 text, C1 text, C2 text, C3 blob, C4 text);", qs, opts).get();
sslog.debug("CREATE TABLE = TB DONE");
sleep(std::chrono::seconds(3)).get();
qp.local().process("insert into ks.tb (key,c0) values ('1','1');", qs, *opts).get();
qp.local().process("insert into ks.tb (key,c0) values ('1','1');", qs, opts).get();
sslog.debug("INSERT VALUE DONE: 1");
qp.local().process("insert into ks.tb (key,c0) values ('2','2');", qs, *opts).get();
qp.local().process("insert into ks.tb (key,c0) values ('2','2');", qs, opts).get();
sslog.debug("INSERT VALUE DONE: 2");
qp.local().process("insert into ks.tb (key,c0) values ('3','3');", qs, *opts).get();
qp.local().process("insert into ks.tb (key,c0) values ('3','3');", qs, opts).get();
sslog.debug("INSERT VALUE DONE: 3");
qp.local().process("insert into ks.tb (key,c0) values ('4','4');", qs, *opts).get();
qp.local().process("insert into ks.tb (key,c0) values ('4','4');", qs, opts).get();
sslog.debug("INSERT VALUE DONE: 4");
qp.local().process("insert into ks.tb (key,c0) values ('5','5');", qs, *opts).get();
qp.local().process("insert into ks.tb (key,c0) values ('5','5');", qs, opts).get();
sslog.debug("INSERT VALUE DONE: 5");
qp.local().process("insert into ks.tb (key,c0) values ('6','6');", qs, *opts).get();
qp.local().process("insert into ks.tb (key,c0) values ('6','6');", qs, opts).get();
sslog.debug("INSERT VALUE DONE: 6");
}).then([] {
sleep(std::chrono::seconds(10)).then([] {

View File

@@ -11,6 +11,7 @@
#include <boost/locale/encoding_utf.hpp>
#include <boost/range/adaptor/sliced.hpp>
#include "cql3/statements/batch_statement.hh"
#include "service/migration_manager.hh"
#include "service/storage_service.hh"
#include "db/consistency_level.hh"
@@ -533,8 +534,71 @@ future<> cql_server::connection::process_execute(uint16_t stream, temporary_buff
future<> cql_server::connection::process_batch(uint16_t stream, temporary_buffer<char> buf)
{
assert(0);
return make_ready_future<>();
if (_version == 1) {
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
}
const auto type = read_byte(buf);
const unsigned n = read_unsigned_short(buf);
std::vector<shared_ptr<cql3::statements::modification_statement>> modifications;
std::vector<std::vector<bytes_view_opt>> values;
modifications.reserve(n);
values.reserve(n);
for ([[gnu::unused]] auto i : boost::irange(0u, n)) {
const auto kind = read_byte(buf);
::shared_ptr<cql3::statements::parsed_statement::prepared> ps;
switch (kind) {
case 0: {
auto query = read_long_string_view(buf).to_string();
ps = _server._query_processor.local().get_statement(query,
_client_state);
break;
}
case 1: {
auto id = read_short_bytes(buf);
ps = _server._query_processor.local().get_prepared(id);
if (!ps) {
throw exceptions::prepared_query_not_found_exception(id);
}
break;
}
default:
throw exceptions::protocol_exception(
"Invalid query kind in BATCH messages. Must be 0 or 1 but got "
+ std::to_string(int(kind)));
}
if (dynamic_cast<cql3::statements::modification_statement*>(ps->statement.get()) == nullptr) {
throw exceptions::invalid_request_exception("Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed.");
}
modifications.emplace_back(static_pointer_cast<cql3::statements::modification_statement>(ps->statement));
std::vector<bytes_view_opt> tmp;
read_value_view_list(buf, tmp);
auto stmt = ps->statement;
if (stmt->get_bound_terms() != tmp.size()) {
throw exceptions::invalid_request_exception(sprint("There were %d markers(?) in CQL but %d bound variables",
stmt->get_bound_terms(), tmp.size()));
}
values.emplace_back(std::move(tmp));
}
auto& q_state = get_query_state(stream);
auto& query_state = q_state.query_state;
q_state.options = std::make_unique<cql3::query_options>(std::move(*read_options(buf)), std::move(values));
auto& options = *q_state.options;
auto batch = ::make_shared<cql3::statements::batch_statement>(-1, cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none());
return _server._query_processor.local().process_batch(batch, query_state, options).then([this, stream, batch] (auto msg) {
return this->write_result(stream, msg);
});
}
future<> cql_server::connection::process_register(uint16_t stream, temporary_buffer<char> buf)