mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 03:20:37 +00:00
Merge "Clean up CQL query options" from Pekka
"Clean up various issues with the query_options class to make it easier to work on."
This commit is contained in:
@@ -154,7 +154,7 @@ public:
|
||||
|
||||
virtual bytes_opt bind_and_get(const query_options& options) override {
|
||||
try {
|
||||
auto value = options.get_values().at(_bind_index);
|
||||
auto value = options.get_value_at(_bind_index);
|
||||
if (value) {
|
||||
_receiver->type->validate(value.value());
|
||||
}
|
||||
|
||||
@@ -190,7 +190,7 @@ lists::delayed_value::bind(const query_options& options) {
|
||||
|
||||
::shared_ptr<terminal>
|
||||
lists::marker::bind(const query_options& options) {
|
||||
const bytes_opt& value = options.get_values()[_bind_index];
|
||||
const bytes_opt& value = options.get_value_at(_bind_index);
|
||||
auto ltype = static_pointer_cast<const list_type_impl>(_receiver->type);
|
||||
if (!value) {
|
||||
return nullptr;
|
||||
|
||||
@@ -32,12 +32,120 @@ thread_local const query_options::specific_options query_options::specific_optio
|
||||
thread_local query_options query_options::DEFAULT{db::consistency_level::ONE, std::experimental::nullopt,
|
||||
{}, false, query_options::specific_options::DEFAULT, version::native_protocol(), serialization_format::use_32_bit()};
|
||||
|
||||
query_options::query_options(db::consistency_level consistency,
|
||||
std::experimental::optional<std::vector<sstring>> names,
|
||||
std::vector<bytes_opt> values,
|
||||
bool skip_metadata,
|
||||
specific_options options,
|
||||
int32_t protocol_version,
|
||||
serialization_format sf)
|
||||
: _consistency(consistency)
|
||||
, _names(std::move(names))
|
||||
, _values(std::move(values))
|
||||
, _skip_metadata(skip_metadata)
|
||||
, _options(std::move(options))
|
||||
, _protocol_version(protocol_version)
|
||||
, _serialization_format(sf)
|
||||
{
|
||||
}
|
||||
|
||||
query_options::query_options(std::vector<bytes_opt> values)
|
||||
: query_options(db::consistency_level::ONE, { }, std::move(values),
|
||||
false, query_options::specific_options::DEFAULT, version::native_protocol(),
|
||||
serialization_format::use_32_bit()) {
|
||||
: query_options(
|
||||
db::consistency_level::ONE,
|
||||
{},
|
||||
std::move(values),
|
||||
false,
|
||||
query_options::specific_options::DEFAULT,
|
||||
version::native_protocol(),
|
||||
serialization_format::use_32_bit()
|
||||
)
|
||||
{
|
||||
}
|
||||
|
||||
db::consistency_level query_options::get_consistency() const
|
||||
{
|
||||
return _consistency;
|
||||
}
|
||||
|
||||
const bytes_opt& query_options::get_value_at(size_t idx) const
|
||||
{
|
||||
return _values.at(idx);
|
||||
}
|
||||
|
||||
size_t query_options::get_values_count() const
|
||||
{
|
||||
return _values.size();
|
||||
}
|
||||
|
||||
bool query_options::skip_metadata() const
|
||||
{
|
||||
return _skip_metadata;
|
||||
}
|
||||
|
||||
int32_t query_options::get_page_size() const
|
||||
{
|
||||
return get_specific_options().page_size;
|
||||
}
|
||||
|
||||
::shared_ptr<service::pager::paging_state> query_options::get_paging_state() const
|
||||
{
|
||||
return get_specific_options().state;
|
||||
}
|
||||
|
||||
std::experimental::optional<db::consistency_level> query_options::get_serial_consistency() const
|
||||
{
|
||||
return get_specific_options().serial_consistency;
|
||||
}
|
||||
|
||||
api::timestamp_type query_options::get_timestamp(service::query_state& state) const
|
||||
{
|
||||
auto tstamp = get_specific_options().timestamp;
|
||||
return tstamp != api::missing_timestamp ? tstamp : state.get_timestamp();
|
||||
}
|
||||
|
||||
int query_options::get_protocol_version() const
|
||||
{
|
||||
return _protocol_version;
|
||||
}
|
||||
|
||||
serialization_format query_options::get_serialization_format() const
|
||||
{
|
||||
return _serialization_format;
|
||||
}
|
||||
|
||||
const query_options::specific_options& query_options::get_specific_options() const
|
||||
{
|
||||
return _options;
|
||||
}
|
||||
|
||||
const query_options& query_options::for_statement(size_t i) const
|
||||
{
|
||||
if (!_batch_options) {
|
||||
// No per-statement options supplied, so use the "global" options
|
||||
return *this;
|
||||
}
|
||||
return _batch_options->at(i);
|
||||
}
|
||||
|
||||
void query_options::prepare(const std::vector<::shared_ptr<column_specification>>& specs)
|
||||
{
|
||||
if (!_names) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto& names = *_names;
|
||||
std::vector<bytes_opt> ordered_values;
|
||||
ordered_values.reserve(specs.size());
|
||||
for (auto&& spec : specs) {
|
||||
auto& spec_name = spec->name->text();
|
||||
for (size_t j = 0; j < names.size(); j++) {
|
||||
if (names[j] == spec_name) {
|
||||
ordered_values.emplace_back(_values[j]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_values = std::move(ordered_values);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -60,197 +60,40 @@ private:
|
||||
std::experimental::optional<std::vector<query_options>> _batch_options;
|
||||
public:
|
||||
explicit query_options(db::consistency_level consistency,
|
||||
std::experimental::optional<std::vector<sstring>> names, std::vector<bytes_opt> values,
|
||||
bool skip_metadata, specific_options options, int32_t protocol_version, serialization_format sf)
|
||||
: _consistency(consistency)
|
||||
, _names(std::move(names))
|
||||
, _values(std::move(values))
|
||||
, _skip_metadata(skip_metadata)
|
||||
, _options(std::move(options))
|
||||
, _protocol_version(protocol_version)
|
||||
, _serialization_format(sf)
|
||||
{ }
|
||||
std::experimental::optional<std::vector<sstring>> names,
|
||||
std::vector<bytes_opt> values,
|
||||
bool skip_metadata,
|
||||
specific_options options,
|
||||
int32_t protocol_version,
|
||||
serialization_format sf);
|
||||
|
||||
// It can't be const because of prepare()
|
||||
static thread_local query_options DEFAULT;
|
||||
|
||||
// forInternalUse
|
||||
explicit query_options(std::vector<bytes_opt> values);
|
||||
#if 0
|
||||
public static final CBCodec<QueryOptions> codec = new Codec();
|
||||
|
||||
public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer> values)
|
||||
{
|
||||
return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 1);
|
||||
}
|
||||
|
||||
public static QueryOptions fromProtocolV2(ConsistencyLevel consistency, List<ByteBuffer> values)
|
||||
{
|
||||
return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 2);
|
||||
}
|
||||
|
||||
public static QueryOptions forInternalCalls(ConsistencyLevel consistency, List<ByteBuffer> values)
|
||||
{
|
||||
return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 3);
|
||||
}
|
||||
|
||||
public static QueryOptions forInternalCalls(List<ByteBuffer> values)
|
||||
{
|
||||
return new DefaultQueryOptions(ConsistencyLevel.ONE, values, false, SpecificOptions.DEFAULT, 3);
|
||||
}
|
||||
|
||||
public static QueryOptions fromPreV3Batch(ConsistencyLevel consistency)
|
||||
{
|
||||
return new DefaultQueryOptions(consistency, Collections.<ByteBuffer>emptyList(), false, SpecificOptions.DEFAULT, 2);
|
||||
}
|
||||
|
||||
public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency)
|
||||
{
|
||||
return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L), 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
db::consistency_level get_consistency() const {
|
||||
return _consistency;
|
||||
}
|
||||
const std::vector<bytes_opt>& get_values() const {
|
||||
return _values;
|
||||
}
|
||||
bool skip_metadata() const {
|
||||
return _skip_metadata;
|
||||
}
|
||||
|
||||
db::consistency_level get_consistency() const;
|
||||
const bytes_opt& get_value_at(size_t idx) const;
|
||||
size_t get_values_count() const;
|
||||
bool skip_metadata() const;
|
||||
/** The pageSize for this query. Will be <= 0 if not relevant for the query. */
|
||||
int32_t get_page_size() const { return get_specific_options().page_size; }
|
||||
|
||||
int32_t get_page_size() const;
|
||||
/** The paging state for this query, or null if not relevant. */
|
||||
::shared_ptr<service::pager::paging_state> get_paging_state() const {
|
||||
return get_specific_options().state;
|
||||
}
|
||||
|
||||
::shared_ptr<service::pager::paging_state> get_paging_state() const;
|
||||
/** Serial consistency for conditional updates. */
|
||||
std::experimental::optional<db::consistency_level> get_serial_consistency() const {
|
||||
return get_specific_options().serial_consistency;
|
||||
}
|
||||
|
||||
api::timestamp_type get_timestamp(service::query_state& state) const {
|
||||
auto tstamp = get_specific_options().timestamp;
|
||||
return tstamp != api::missing_timestamp ? tstamp : state.get_timestamp();
|
||||
}
|
||||
|
||||
std::experimental::optional<db::consistency_level> get_serial_consistency() const;
|
||||
api::timestamp_type get_timestamp(service::query_state& state) const;
|
||||
/**
|
||||
* The protocol version for the query. Will be 3 if the object don't come from
|
||||
* a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
|
||||
*/
|
||||
int get_protocol_version() const {
|
||||
return _protocol_version;
|
||||
}
|
||||
serialization_format get_serialization_format() const { return _serialization_format; }
|
||||
|
||||
int get_protocol_version() const;
|
||||
serialization_format get_serialization_format() const;
|
||||
// Mainly for the sake of BatchQueryOptions
|
||||
const specific_options& get_specific_options() const {
|
||||
return _options;
|
||||
}
|
||||
|
||||
const query_options& for_statement(size_t i) const {
|
||||
if (!_batch_options) {
|
||||
// No per-statement options supplied, so use the "global" options
|
||||
return *this;
|
||||
}
|
||||
return _batch_options->at(i);
|
||||
}
|
||||
|
||||
void prepare(const std::vector<::shared_ptr<column_specification>>& specs) {
|
||||
if (!_names) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto& names = *_names;
|
||||
std::vector<bytes_opt> ordered_values;
|
||||
ordered_values.reserve(specs.size());
|
||||
for (auto&& spec : specs) {
|
||||
auto& spec_name = spec->name->text();
|
||||
for (size_t j = 0; j < names.size(); j++) {
|
||||
if (names[j] == spec_name) {
|
||||
ordered_values.emplace_back(_values[j]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_values = std::move(ordered_values);
|
||||
}
|
||||
|
||||
#if 0
|
||||
private static class Codec implements CBCodec<QueryOptions>
|
||||
{
|
||||
|
||||
public void encode(QueryOptions options, ByteBuf dest, int version)
|
||||
{
|
||||
assert version >= 2;
|
||||
|
||||
CBUtil.writeConsistencyLevel(options.getConsistency(), dest);
|
||||
|
||||
EnumSet<Flag> flags = gatherFlags(options);
|
||||
dest.writeByte((byte)Flag.serialize(flags));
|
||||
|
||||
if (flags.contains(Flag.VALUES))
|
||||
CBUtil.writeValueList(options.getValues(), dest);
|
||||
if (flags.contains(Flag.PAGE_SIZE))
|
||||
dest.writeInt(options.getPageSize());
|
||||
if (flags.contains(Flag.PAGING_STATE))
|
||||
CBUtil.writeValue(options.getPagingState().serialize(), dest);
|
||||
if (flags.contains(Flag.SERIAL_CONSISTENCY))
|
||||
CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
|
||||
if (flags.contains(Flag.TIMESTAMP))
|
||||
dest.writeLong(options.getSpecificOptions().timestamp);
|
||||
|
||||
// Note that we don't really have to bother with NAMES_FOR_VALUES server side,
|
||||
// and in fact we never really encode QueryOptions, only decode them, so we
|
||||
// don't bother.
|
||||
}
|
||||
|
||||
public int encodedSize(QueryOptions options, int version)
|
||||
{
|
||||
int size = 0;
|
||||
|
||||
size += CBUtil.sizeOfConsistencyLevel(options.getConsistency());
|
||||
|
||||
EnumSet<Flag> flags = gatherFlags(options);
|
||||
size += 1;
|
||||
|
||||
if (flags.contains(Flag.VALUES))
|
||||
size += CBUtil.sizeOfValueList(options.getValues());
|
||||
if (flags.contains(Flag.PAGE_SIZE))
|
||||
size += 4;
|
||||
if (flags.contains(Flag.PAGING_STATE))
|
||||
size += CBUtil.sizeOfValue(options.getPagingState().serialize());
|
||||
if (flags.contains(Flag.SERIAL_CONSISTENCY))
|
||||
size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
|
||||
if (flags.contains(Flag.TIMESTAMP))
|
||||
size += 8;
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
private EnumSet<Flag> gatherFlags(QueryOptions options)
|
||||
{
|
||||
EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
|
||||
if (options.getValues().size() > 0)
|
||||
flags.add(Flag.VALUES);
|
||||
if (options.skipMetadata())
|
||||
flags.add(Flag.SKIP_METADATA);
|
||||
if (options.getPageSize() >= 0)
|
||||
flags.add(Flag.PAGE_SIZE);
|
||||
if (options.getPagingState() != null)
|
||||
flags.add(Flag.PAGING_STATE);
|
||||
if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
|
||||
flags.add(Flag.SERIAL_CONSISTENCY);
|
||||
if (options.getSpecificOptions().timestamp != Long.MIN_VALUE)
|
||||
flags.add(Flag.TIMESTAMP);
|
||||
return flags;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
const specific_options& get_specific_options() const;
|
||||
const query_options& for_statement(size_t i) const;
|
||||
void prepare(const std::vector<::shared_ptr<column_specification>>& specs);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ query_processor::process(const sstring_view& query_string, service::query_state&
|
||||
auto p = get_statement(query_string, query_state.get_client_state());
|
||||
options.prepare(p->bound_names);
|
||||
auto cql_statement = p->statement;
|
||||
if (cql_statement->get_bound_terms() != options.get_values().size()) {
|
||||
if (cql_statement->get_bound_terms() != options.get_values_count()) {
|
||||
throw exceptions::invalid_request_exception("Invalid amount of bind variables");
|
||||
}
|
||||
|
||||
|
||||
@@ -193,7 +193,7 @@ sets::delayed_value::bind(const query_options& options) {
|
||||
|
||||
::shared_ptr<terminal>
|
||||
sets::marker::bind(const query_options& options) {
|
||||
auto value = options.get_values().at(_bind_index);
|
||||
auto value = options.get_value_at(_bind_index);
|
||||
if (!value) {
|
||||
return nullptr;
|
||||
} else {
|
||||
|
||||
@@ -372,7 +372,7 @@ public:
|
||||
{ }
|
||||
|
||||
virtual shared_ptr<terminal> bind(const query_options& options) override {
|
||||
auto value = options.get_values().at(_bind_index);
|
||||
auto value = options.get_value_at(_bind_index);
|
||||
if (!value) {
|
||||
return nullptr;
|
||||
} else {
|
||||
@@ -394,7 +394,7 @@ public:
|
||||
}
|
||||
|
||||
virtual shared_ptr<terminal> bind(const query_options& options) override {
|
||||
auto value = options.get_values().at(_bind_index);
|
||||
auto value = options.get_value_at(_bind_index);
|
||||
if (!value) {
|
||||
return nullptr;
|
||||
} else {
|
||||
|
||||
@@ -517,7 +517,7 @@ future<> cql_server::connection::process_execute(uint16_t stream, temporary_buff
|
||||
auto& options = *q_state.options;
|
||||
options.prepare(prepared->bound_names);
|
||||
auto stmt = prepared->statement;
|
||||
if (stmt->get_bound_terms() != options.get_values().size()) {
|
||||
if (stmt->get_bound_terms() != options.get_values_count()) {
|
||||
throw exceptions::invalid_request_exception("Invalid amount of bind variables");
|
||||
}
|
||||
return _server._query_processor.local().process_statement(stmt, query_state, options).then([this, stream] (auto msg) {
|
||||
|
||||
Reference in New Issue
Block a user