Merge 'ALLOW FILTERING for indexed queries' from Piotr

"
Previous series on ALLOW FILTERING introduced it for regular queries,
but it's also possible to have an indexed query which requires
filtering. This series contains minor fixes that allow treating
indexed+filtered queries properly. The most important part is having
more selective approach of extracting values from restrictions
in read_posting_list() helper function. Before ALLOW FILTERING,
restrictions contained only a single entry that matched the indexed
column, but it's not the case with filtering (and it won't be the case
with multiple indexing support).

This series also comes with test cases for indexed+filtered queries.

Tests: unit (release)
"

* 'allow_filtering_and_si_3' of https://github.com/psarna/scylla:
  tests: add filtering indexed queries tests
  cql3: use single restriction value in index creation
  cql3: add secondary index condition to need_filtering
  cql3: add value_for method
  cql3: add missing inline declarations to restrictions
  cql3: make index detection more specific
  index: add target_column getter to index
This commit is contained in:
Duarte Nunes
2018-07-12 00:17:36 +01:00
7 changed files with 193 additions and 20 deletions

View File

@@ -68,6 +68,10 @@ public:
virtual std::vector<bytes_opt> values(const query_options& options) const = 0;
virtual bytes_opt value_for(const column_definition& cdef, const query_options& options) const {
throw exceptions::invalid_request_exception("Single value can be obtained from single-column restrictions only");
}
/**
* Returns <code>true</code> if one of the restrictions use the specified function.
*

View File

@@ -49,6 +49,7 @@
#include <boost/algorithm/cxx11/all_of.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/map.hpp>
namespace cql3 {
@@ -309,6 +310,11 @@ public:
}
return res;
}
virtual bytes_opt value_for(const column_definition& cdef, const query_options& options) const override {
return _restrictions->value_for(cdef, options);
}
std::vector<bytes_opt> bounds(statements::bound b, const query_options& options) const override {
// TODO: if this proved to be required.
fail(unimplemented::cause::LEGACY_COMPOSITE_KEYS); // not 100% correct...
@@ -358,7 +364,7 @@ public:
};
template<>
dht::partition_range_vector
inline dht::partition_range_vector
single_column_primary_key_restrictions<partition_key>::bounds_ranges(const query_options& options) const {
dht::partition_range_vector ranges;
ranges.reserve(size());
@@ -376,7 +382,7 @@ single_column_primary_key_restrictions<partition_key>::bounds_ranges(const query
}
template<>
std::vector<query::clustering_range>
inline std::vector<query::clustering_range>
single_column_primary_key_restrictions<clustering_key_prefix>::bounds_ranges(const query_options& options) const {
auto wrapping_bounds = compute_bounds(options);
auto bounds = boost::copy_range<query::clustering_row_ranges>(wrapping_bounds
@@ -413,12 +419,12 @@ single_column_primary_key_restrictions<clustering_key_prefix>::bounds_ranges(con
}
template<>
bool single_column_primary_key_restrictions<partition_key>::needs_filtering(const schema& schema) const {
inline bool single_column_primary_key_restrictions<partition_key>::needs_filtering(const schema& schema) const {
return primary_key_restrictions<partition_key>::needs_filtering(schema);
}
template<>
bool single_column_primary_key_restrictions<clustering_key>::needs_filtering(const schema& schema) const {
inline bool single_column_primary_key_restrictions<clustering_key>::needs_filtering(const schema& schema) const {
// Restrictions currently need filtering in three cases:
// 1. any of them is a CONTAINS restriction
// 2. restrictions do not form a contiguous prefix (i.e. there are gaps in it)

View File

@@ -111,6 +111,11 @@ public:
return r;
}
virtual bytes_opt value_for(const column_definition& cdef, const query_options& options) const override {
auto it = _restrictions.find(std::addressof(cdef));
return (it != _restrictions.end()) ? it->second->value(options) : bytes_opt{};
}
/**
* Returns the restriction associated to the specified column.
*

View File

@@ -72,6 +72,9 @@ public:
// throw? should not reach?
return {};
}
bytes_opt value_for(const column_definition& cdef, const query_options& options) const override {
return {};
}
std::vector<T> values_as_keys(const query_options& options) const override {
// throw? should not reach?
return {};
@@ -212,12 +215,13 @@ statement_restrictions::statement_restrictions(database& db,
auto& cf = db.find_column_family(schema);
auto& sim = cf.get_index_manager();
bool has_queriable_clustering_column_index = _clustering_columns_restrictions->has_supporting_index(sim);
bool has_queriable_pk_index = _partition_key_restrictions->has_supporting_index(sim);
bool has_queriable_index = has_queriable_clustering_column_index
|| _partition_key_restrictions->has_supporting_index(sim)
|| has_queriable_pk_index
|| _nonprimary_key_restrictions->has_supporting_index(sim);
// At this point, the select statement if fully constructed, but we still have a few things to validate
process_partition_key_restrictions(has_queriable_index, for_view, allow_filtering);
process_partition_key_restrictions(has_queriable_pk_index, for_view, allow_filtering);
// Some but not all of the partition key columns have been specified;
// hence we need turn these restrictions into index expressions.
@@ -237,7 +241,7 @@ statement_restrictions::statement_restrictions(database& db,
}
}
process_clustering_columns_restrictions(has_queriable_index, select_a_collection, for_view, allow_filtering);
process_clustering_columns_restrictions(has_queriable_clustering_column_index, select_a_collection, for_view, allow_filtering);
// Covers indexes on the first clustering column (among others).
if (_is_key_range && has_queriable_clustering_column_index) {
@@ -422,10 +426,11 @@ std::vector<query::clustering_range> statement_restrictions::get_clustering_boun
}
bool statement_restrictions::need_filtering() const {
uint32_t number_of_restricted_columns = 0;
uint32_t number_of_restricted_columns_for_indexing = 0;
for (auto&& restrictions : _index_restrictions) {
number_of_restricted_columns += restrictions->size();
number_of_restricted_columns_for_indexing += restrictions->size();
}
int number_of_all_restrictions = _partition_key_restrictions->size() + _clustering_columns_restrictions->size() + _nonprimary_key_restrictions->size();
if (_partition_key_restrictions->is_multi_column() || _clustering_columns_restrictions->is_multi_column()) {
// TODO(sarna): Implement ALLOW FILTERING support for multi-column restrictions - return false for now
@@ -433,10 +438,11 @@ bool statement_restrictions::need_filtering() const {
return false;
}
return number_of_restricted_columns > 1
|| (number_of_restricted_columns == 0 && _partition_key_restrictions->empty() && !_clustering_columns_restrictions->empty())
|| (number_of_restricted_columns != 0 && _nonprimary_key_restrictions->has_multiple_contains())
|| (number_of_restricted_columns != 0 && !_uses_secondary_indexing);
return number_of_restricted_columns_for_indexing > 1
|| (number_of_restricted_columns_for_indexing == 0 && _partition_key_restrictions->empty() && !_clustering_columns_restrictions->empty())
|| (number_of_restricted_columns_for_indexing != 0 && _nonprimary_key_restrictions->has_multiple_contains())
|| (number_of_restricted_columns_for_indexing != 0 && !_uses_secondary_indexing)
|| (_uses_secondary_indexing && number_of_all_restrictions > 1);
}
void statement_restrictions::validate_secondary_index_selections(bool selects_only_static_columns) {

View File

@@ -773,6 +773,8 @@ get_index_schema(service::storage_proxy& proxy,
static future<service::storage_proxy::coordinator_query_result>
read_posting_list(service::storage_proxy& proxy,
schema_ptr view_schema,
schema_ptr base_schema,
const secondary_index::index& index,
const std::vector<::shared_ptr<restrictions::restrictions>>& index_restrictions,
const query_options& options,
int32_t limit,
@@ -784,11 +786,19 @@ read_posting_list(service::storage_proxy& proxy,
// FIXME: there should be only one index restriction for this index!
// Perhaps even one index restriction entirely (do we support
// intersection queries?).
for (const auto& restriction : index_restrictions) {
auto pk = partition_key::from_optional_exploded(*view_schema, restriction->values(options));
auto dk = dht::global_partitioner().decorate_key(*view_schema, pk);
auto range = dht::partition_range::make_singular(dk);
partition_ranges.emplace_back(range);
for (const auto& restrictions : index_restrictions) {
const column_definition* cdef = base_schema->get_column_definition(to_bytes(index.target_column()));
if (!cdef) {
throw exceptions::invalid_request_exception("Indexed column not found in schema");
}
bytes_opt value = restrictions->value_for(*cdef, options);
if (value) {
auto pk = partition_key::from_single_value(*view_schema, *value);
auto dk = dht::global_partitioner().decorate_key(*view_schema, pk);
auto range = dht::partition_range::make_singular(dk);
partition_ranges.emplace_back(range);
}
}
partition_slice_builder partition_slice_builder{*view_schema};
auto cmd = ::make_lw_shared<query::read_command>(
@@ -818,7 +828,7 @@ indexed_table_select_statement::find_index_partition_ranges(service::storage_pro
schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state());
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
return read_posting_list(proxy, view, _restrictions->index_restrictions(), options, get_limit(options), state, now, timeout).then(
return read_posting_list(proxy, view, _schema, _index, _restrictions->index_restrictions(), options, get_limit(options), state, now, timeout).then(
[this, now, &options, view] (service::storage_proxy::coordinator_query_result qr) {
std::vector<const column_definition*> columns;
for (const column_definition& cdef : _schema->partition_key_columns()) {
@@ -870,7 +880,7 @@ indexed_table_select_statement::find_index_clustering_rows(service::storage_prox
schema_ptr view = get_index_schema(proxy, _index, _schema, state.get_trace_state());
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + options.get_timeout_config().*get_timeout_config_selector();
return read_posting_list(proxy, view, _restrictions->index_restrictions(), options, get_limit(options), state, now, timeout).then(
return read_posting_list(proxy, view, _schema, _index, _restrictions->index_restrictions(), options, get_limit(options), state, now, timeout).then(
[this, now, &options, view] (service::storage_proxy::coordinator_query_result qr) {
std::vector<const column_definition*> columns;
for (const column_definition& cdef : _schema->partition_key_columns()) {

View File

@@ -60,6 +60,9 @@ public:
bool depends_on(const column_definition& cdef) const;
bool supports_expression(const column_definition& cdef, const cql3::operator_type& op) const;
const index_metadata& metadata() const;
const sstring& target_column() const {
return _target_column;
}
};
class secondary_index_manager {

View File

@@ -3331,3 +3331,142 @@ SEASTAR_TEST_CASE(test_allow_filtering_multiple_regular) {
}});
});
}
SEASTAR_TEST_CASE(test_allow_filtering_with_secondary_index) {
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY(a, b));").get();
e.require_table_exists("ks", "t").get();
e.execute_cql("CREATE INDEX ON t(c)").get();
e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 1, 1, 1, 1)").get();
e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 3, 4, 5)").get();
e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 3, 5, 1, 9)").get();
e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 4, 5, 7, 5)").get();
auto msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 3").get0();
assert_that(msg).is_rows().with_rows({{
int32_type->decompose(1),
int32_type->decompose(2),
int32_type->decompose(3),
int32_type->decompose(4),
int32_type->decompose(5)
}});
BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE c = 5 and d = 1").get(), exceptions::invalid_request_exception);
msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 5 and d = 1 ALLOW FILTERING").get0();
assert_that(msg).is_rows().with_rows({{
int32_type->decompose(1),
int32_type->decompose(3),
int32_type->decompose(5),
int32_type->decompose(1),
int32_type->decompose(9)
}});
e.execute_cql("CREATE TABLE t2 (pk1 int, pk2 int, c1 int, c2 int, v int, PRIMARY KEY ((pk1, pk2), c1, c2));").get();
e.execute_cql("CREATE INDEX ON t2(v)").get();
for (int i = 1; i <= 5; ++i) {
for (int j = 1; j <= 2; ++j) {
e.execute_cql(sprint("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES (%s, %s, %s, %s, %s)", j, 1, 1, 1, i)).get();
e.execute_cql(sprint("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES (%s, %s, %s, %s, %s)", j, 1, 1, i, i)).get();
e.execute_cql(sprint("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES (%s, %s, %s, %s, %s)", j, 1, i, i, i)).get();
e.execute_cql(sprint("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES (%s, %s, %s, %s, %s)", j, i, i, i, i)).get();
}
}
eventually([&] {
auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c1 > 0 AND c1 < 5 AND c2 = 1 AND v = 3 ALLOW FILTERING;").get0();
assert_that(msg).is_rows().with_rows({});
});
eventually([&] {
auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c1 > 0 AND c1 < 5 AND c2 = 3 AND v = 3 ALLOW FILTERING;").get0();
assert_that(msg).is_rows().with_rows({
{
int32_type->decompose(1),
int32_type->decompose(3),
int32_type->decompose(3),
int32_type->decompose(3),
int32_type->decompose(3)
},
{
int32_type->decompose(1),
int32_type->decompose(1),
int32_type->decompose(1),
int32_type->decompose(3),
int32_type->decompose(3)
},
{
int32_type->decompose(1),
int32_type->decompose(1),
int32_type->decompose(3),
int32_type->decompose(3),
int32_type->decompose(3)
}
});
});
eventually([&] {
auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c2 > 1 AND c2 < 5 AND v = 1 ALLOW FILTERING;").get0();
assert_that(msg).is_rows().with_rows({});
});
eventually([&] {
auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c1 > 1 AND c2 > 2 AND v = 3 ALLOW FILTERING;").get0();
assert_that(msg).is_rows().with_rows({
{
int32_type->decompose(1),
int32_type->decompose(3),
int32_type->decompose(3),
int32_type->decompose(3),
int32_type->decompose(3)
},
{
int32_type->decompose(1),
int32_type->decompose(1),
int32_type->decompose(3),
int32_type->decompose(3),
int32_type->decompose(3)
}
});
});
eventually([&] {
auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND pk2 > 1 AND c2 > 2 AND v = 3 ALLOW FILTERING;").get0();
assert_that(msg).is_rows().with_rows({{
int32_type->decompose(1),
int32_type->decompose(3),
int32_type->decompose(3),
int32_type->decompose(3),
int32_type->decompose(3)
}});
});
eventually([&] {
auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 >= 2 AND pk2 <=3 AND c1 IN(0,1,2) AND c2 IN(0,1,2) AND v < 3 ALLOW FILTERING;").get0();
assert_that(msg).is_rows().with_rows({
{
int32_type->decompose(2),
int32_type->decompose(2),
int32_type->decompose(2),
int32_type->decompose(2),
int32_type->decompose(2)
},
{
int32_type->decompose(2),
int32_type->decompose(1),
int32_type->decompose(1),
int32_type->decompose(2),
int32_type->decompose(2)
},
{
int32_type->decompose(2),
int32_type->decompose(1),
int32_type->decompose(2),
int32_type->decompose(2),
int32_type->decompose(2)
}
});
});
});
}