From 1953c5fa61d64c7e17ffc27f5932dcf7fc4454ea Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 29 Nov 2018 23:06:19 +0000 Subject: [PATCH] Merge 'Fix filtering with LIMIT' from Piotr " This series adds proper handling of filtering queries with LIMIT. Previously the limit was erroneously applied before filtering, which leads to truncated results. To avoid that, paged filtering queries now use an enhanced pager, which remembers how many rows dropped and uses that information to fetch for more pages if the limit is not yet reached. For unpaged filtering queries, paging is done internally as in case of aggregations to avoid returning keeping huge results in memory. Also, previously, all limited queries used the page size counted from max(page size, limit). It's not good for filtering, because with LIMIT 1 we would then query for rows one-by-one. To avoid that, filtered queries ask for the whole page and the results are truncated if need be afterwards. Tests: unit (release) " * 'fix_filtering_with_limit_2' of https://github.com/psarna/scylla: tests: add filtering with LIMIT test tests: split filtering tests from cql_query_test cql3: add proper handling of filtering with LIMIT service/pager: use dropped_rows to adjust how many rows to read service/pager: virtualize max_rows_to_fetch function cql3: add counting dropped rows in filtering pager (cherry picked from commit 1afda28cf320dd22c78690923ed71b8e6b99c6ab) --- configure.py | 1 + cql3/selection/selection.cc | 14 +- cql3/selection/selection.hh | 13 +- cql3/statements/select_statement.cc | 41 +- service/pager/query_pager.hh | 4 + service/pager/query_pagers.cc | 15 +- test.py | 1 + tests/cql_query_test.cc | 749 --------------------------- tests/filtering_test.cc | 774 ++++++++++++++++++++++++++++ 9 files changed, 841 insertions(+), 771 deletions(-) create mode 100644 tests/filtering_test.cc diff --git a/configure.py b/configure.py index cc87cffb34..82bf58ad44 100755 --- a/configure.py +++ b/configure.py @@ -271,6 +271,7 @@ scylla_tests = [ 'tests/perf/perf_sstable', 'tests/cql_query_test', 'tests/secondary_index_test', + 'tests/filtering_test', 'tests/storage_proxy_test', 'tests/schema_change_test', 'tests/mutation_reader_test', diff --git a/cql3/selection/selection.cc b/cql3/selection/selection.cc index 69ecdb06c3..7eeff9c190 100644 --- a/cql3/selection/selection.cc +++ b/cql3/selection/selection.cc @@ -339,7 +339,7 @@ std::unique_ptr result_set_builder::build() { return std::move(_result_set); } -bool result_set_builder::restrictions_filter::operator()(const selection& selection, +bool result_set_builder::restrictions_filter::do_filter(const selection& selection, const std::vector& partition_key, const std::vector& clustering_key, const query::result_row_view& static_row, @@ -427,6 +427,18 @@ bool result_set_builder::restrictions_filter::operator()(const selection& select return true; } +bool result_set_builder::restrictions_filter::operator()(const selection& selection, + const std::vector& partition_key, + const std::vector& clustering_key, + const query::result_row_view& static_row, + const query::result_row_view& row) const { + bool accepted = do_filter(selection, partition_key, clustering_key, static_row, row); + if (!accepted) { + ++_rows_dropped; + } + return accepted; +} + api::timestamp_type result_set_builder::timestamp_of(size_t idx) { return _timestamps[idx]; } diff --git a/cql3/selection/selection.hh b/cql3/selection/selection.hh index 3663e36382..f3ddb36e2c 100644 --- a/cql3/selection/selection.hh +++ b/cql3/selection/selection.hh @@ -259,12 +259,16 @@ public: } void reset() { } + uint32_t get_rows_dropped() const { + return 0; + } }; class restrictions_filter { ::shared_ptr _restrictions; const query_options& _options; mutable bool _current_partition_key_does_not_match = false; mutable bool _current_static_row_does_not_match = false; + mutable uint32_t _rows_dropped = 0; public: restrictions_filter() = default; explicit restrictions_filter(::shared_ptr restrictions, const query_options& options) : _restrictions(restrictions), _options(options) {} @@ -272,7 +276,13 @@ public: void reset() { _current_partition_key_does_not_match = false; _current_static_row_does_not_match = false; + _rows_dropped = 0; } + uint32_t get_rows_dropped() const { + return _rows_dropped; + } + private: + bool do_filter(const selection& selection, const std::vector& pk, const std::vector& ck, const query::result_row_view& static_row, const query::result_row_view& row) const; }; result_set_builder(const selection& s, gc_clock::time_point now, cql_serialization_format sf); @@ -372,7 +382,7 @@ public: } } - void accept_partition_end(const query::result_row_view& static_row) { + uint32_t accept_partition_end(const query::result_row_view& static_row) { if (_row_count == 0) { _builder.new_row(); auto static_row_iterator = static_row.iterator(); @@ -386,6 +396,7 @@ public: } } } + return _filter.get_rows_dropped(); } }; diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index da119d9626..9a3b7434b3 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -383,8 +383,9 @@ select_statement::do_execute(service::storage_proxy& proxy, int32_t limit = get_limit(options); auto now = gc_clock::now(); + const bool restrictions_need_filtering = _restrictions->need_filtering(); ++_stats.reads; - _stats.filtered_reads += _restrictions->need_filtering(); + _stats.filtered_reads += restrictions_need_filtering; auto command = ::make_lw_shared(_schema->id(), _schema->version(), make_partition_slice(options), limit, now, tracing::make_trace_info(state.get_trace_state()), query::max_partitions, utils::UUID(), options.get_timestamp(state)); @@ -396,14 +397,15 @@ select_statement::do_execute(service::storage_proxy& proxy, // An aggregation query will never be paged for the user, but we always page it internally to avoid OOM. // If we user provided a page_size we'll use that to page internally (because why not), otherwise we use our default // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707). - auto aggregate = _selection->is_aggregate(); - if (aggregate && page_size <= 0) { + const bool aggregate = _selection->is_aggregate(); + const bool nonpaged_filtering = restrictions_need_filtering && page_size <= 0; + if (aggregate || nonpaged_filtering) { page_size = DEFAULT_COUNT_PAGE_SIZE; } auto key_ranges = _restrictions->get_partition_key_ranges(options); - if (!aggregate && (page_size <= 0 + if (!aggregate && !restrictions_need_filtering && (page_size <= 0 || !service::pager::query_pagers::may_need_paging(*_schema, page_size, *command, key_ranges))) { return execute(proxy, command, std::move(key_ranges), state, options, now); @@ -412,22 +414,25 @@ select_statement::do_execute(service::storage_proxy& proxy, command->slice.options.set(); auto timeout_duration = options.get_timeout_config().*get_timeout_config_selector(); auto p = service::pager::query_pagers::pager(_schema, _selection, - state, options, command, std::move(key_ranges), _stats, _restrictions->need_filtering() ? _restrictions : nullptr); + state, options, command, std::move(key_ranges), _stats, restrictions_need_filtering ? _restrictions : nullptr); - if (aggregate) { + if (aggregate || nonpaged_filtering) { return do_with( cql3::selection::result_set_builder(*_selection, now, options.get_cql_serialization_format()), - [this, p, page_size, now, timeout_duration](auto& builder) { + [this, p, page_size, now, timeout_duration, restrictions_need_filtering, limit](auto& builder) { return do_until([p] {return p->is_exhausted();}, [p, &builder, page_size, now, timeout_duration] { auto timeout = db::timeout_clock::now() + timeout_duration; return p->fetch_page(builder, page_size, now, timeout); } - ).then([this, &builder] { + ).then([this, &builder, restrictions_need_filtering, limit] { auto rs = builder.build(); + if (restrictions_need_filtering) { + rs->trim(limit); + _stats.filtered_rows_matched_total += rs->size(); + } update_stats_rows_read(rs->size()); - _stats.filtered_rows_matched_total += _restrictions->need_filtering() ? rs->size() : 0; auto msg = ::make_shared(result(std::move(rs))); return make_ready_future>(std::move(msg)); }); @@ -441,7 +446,7 @@ select_statement::do_execute(service::storage_proxy& proxy, } auto timeout = db::timeout_clock::now() + timeout_duration; - if (_selection->is_trivial() && !_restrictions->need_filtering()) { + if (_selection->is_trivial() && !restrictions_need_filtering) { return p->fetch_page_generator(page_size, now, timeout, _stats).then([this, p, limit] (result_generator generator) { auto meta = [&] () -> shared_ptr { if (!p->is_exhausted()) { @@ -460,14 +465,17 @@ select_statement::do_execute(service::storage_proxy& proxy, } return p->fetch_page(page_size, now, timeout).then( - [this, p, &options, limit, now](std::unique_ptr rs) { + [this, p, &options, limit, now, restrictions_need_filtering](std::unique_ptr rs) { if (!p->is_exhausted()) { rs->get_metadata().set_paging_state(p->state()); } + if (restrictions_need_filtering) { + rs->trim(limit); + _stats.filtered_rows_matched_total += rs->size(); + } update_stats_rows_read(rs->size()); - _stats.filtered_rows_matched_total += _restrictions->need_filtering() ? rs->size() : 0; auto msg = ::make_shared(result(std::move(rs))); return make_ready_future>(std::move(msg)); }); @@ -714,7 +722,8 @@ select_statement::process_results(foreign_ptr> resu const query_options& options, gc_clock::time_point now) { - bool fast_path = !needs_post_query_ordering() && _selection->is_trivial() && !_restrictions->need_filtering(); + const bool restrictions_need_filtering = _restrictions->need_filtering(); + const bool fast_path = !needs_post_query_ordering() && _selection->is_trivial() && !restrictions_need_filtering; if (fast_path) { return make_shared(result( result_generator(_schema, std::move(results), std::move(cmd), _selection, _stats), @@ -724,7 +733,7 @@ select_statement::process_results(foreign_ptr> resu cql3::selection::result_set_builder builder(*_selection, now, options.get_cql_serialization_format()); - if (_restrictions->need_filtering()) { + if (restrictions_need_filtering) { results->ensure_counts(); _stats.filtered_rows_read_total += *results->row_count(); query::result_view::consume(*results, cmd->slice, @@ -743,9 +752,11 @@ select_statement::process_results(foreign_ptr> resu rs->reverse(); } rs->trim(cmd->row_limit); + } else if (restrictions_need_filtering) { + rs->trim(cmd->row_limit); } update_stats_rows_read(rs->size()); - _stats.filtered_rows_matched_total += _restrictions->need_filtering() ? rs->size() : 0; + _stats.filtered_rows_matched_total += restrictions_need_filtering ? rs->size() : 0; return ::make_shared(result(std::move(rs))); } diff --git a/service/pager/query_pager.hh b/service/pager/query_pager.hh index 0bc400e0f4..d5e63c1b1b 100644 --- a/service/pager/query_pager.hh +++ b/service/pager/query_pager.hh @@ -151,6 +151,10 @@ protected: void handle_result(Visitor&& visitor, const foreign_ptr>& results, uint32_t page_size, gc_clock::time_point now); + + virtual uint32_t max_rows_to_fetch(uint32_t page_size) { + return std::min(_max, page_size); + } }; } diff --git a/service/pager/query_pagers.cc b/service/pager/query_pagers.cc index 9812bbbfc9..f542a00e4d 100644 --- a/service/pager/query_pagers.cc +++ b/service/pager/query_pagers.cc @@ -55,7 +55,7 @@ struct noop_visitor { void accept_new_partition(const partition_key& key, uint32_t row_count) { } void accept_new_row(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) { } void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) { } - void accept_partition_end(const query::result_row_view& static_row) { } + uint32_t accept_partition_end(const query::result_row_view& static_row) { return 0; } }; static bool has_clustering_keys(const schema& s, const query::read_command& cmd) { @@ -202,7 +202,7 @@ static bool has_clustering_keys(const schema& s, const query::read_command& cmd) } } - auto max_rows = std::min(_max, page_size); + auto max_rows = max_rows_to_fetch(page_size); // We always need PK so we can determine where to start next. _cmd->slice.options.set(); @@ -284,6 +284,10 @@ public: std::move(qr.query_result), page_size, now); }); } +protected: + virtual uint32_t max_rows_to_fetch(uint32_t page_size) override { + return page_size; + } }; template @@ -291,6 +295,7 @@ class query_pager::query_result_visitor : public Base { using visitor = Base; public: uint32_t total_rows = 0; + uint32_t dropped_rows = 0; std::experimental::optional last_pkey; std::experimental::optional last_ckey; @@ -317,7 +322,7 @@ public: visitor::accept_new_row(static_row, row); } void accept_partition_end(const query::result_row_view& static_row) { - visitor::accept_partition_end(static_row); + dropped_rows += visitor::accept_partition_end(static_row); } }; @@ -348,9 +353,9 @@ public: update_slice(*_last_pkey); } - row_count = v.total_rows; + row_count = v.total_rows - v.dropped_rows; _max = _max - row_count; - _exhausted = (v.total_rows < page_size && !results->is_short_read()) || _max == 0; + _exhausted = (v.total_rows < page_size && !results->is_short_read() && v.dropped_rows == 0) || _max == 0; _last_pkey = v.last_pkey; _last_ckey = v.last_ckey; } else { diff --git a/test.py b/test.py index cd8f665e59..3a36c0c39f 100755 --- a/test.py +++ b/test.py @@ -44,6 +44,7 @@ boost_tests = [ 'serialized_action_test', 'cql_query_test', 'secondary_index_test', + 'filtering_test', 'storage_proxy_test', 'schema_change_test', 'sstable_mutation_test', diff --git a/tests/cql_query_test.cc b/tests/cql_query_test.cc index e489b38008..cec2534ca3 100644 --- a/tests/cql_query_test.cc +++ b/tests/cql_query_test.cc @@ -3118,677 +3118,6 @@ SEASTAR_TEST_CASE(test_empty_partition_range_scan) { }); } -SEASTAR_TEST_CASE(test_allow_filtering_check) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE t (p int, c int, v int, PRIMARY KEY(p, c));").get(); - e.require_table_exists("ks", "t").get(); - - for (int i = 0; i < 3; ++i) { - for (int j = 0; j <3; ++j) { - e.execute_cql(sprint("INSERT INTO t(p, c, v) VALUES (%s, %s, %s)", i, j, j)).get(); - } - } - - std::vector queries = { - "SELECT * FROM t WHERE p = 1", - "SELECT * FROM t WHERE p = 1 and c > 2", - "SELECT * FROM t WHERE p = 1 and c = 2" - }; - - for (const sstring& q : queries) { - e.execute_cql(q).get(); - e.execute_cql(q + " ALLOW FILTERING").get(); - } - - queries = { - "SELECT * FROM t WHERE c = 2", - "SELECT * FROM t WHERE c <= 4" - }; - - for (const sstring& q : queries) { - BOOST_CHECK_THROW(e.execute_cql(q).get(), exceptions::invalid_request_exception); - e.execute_cql(q + " ALLOW FILTERING").get(); - } - - e.execute_cql("CREATE TABLE t2 (p int PRIMARY KEY, a int, b int);").get(); - e.require_table_exists("ks", "t2").get(); - e.execute_cql("CREATE INDEX ON t2(a)").get(); - for (int i = 0; i < 5; ++i) { - e.execute_cql(sprint("INSERT INTO t2 (p, a, b) VALUES (%s, %s, %s)", i, i * 10, i * 100)).get(); - } - - queries = { - "SELECT * FROM t2 WHERE p = 1", - "SELECT * FROM t2 WHERE a = 20" - }; - - for (const sstring& q : queries) { - e.execute_cql(q).get(); - e.execute_cql(q + " ALLOW FILTERING").get(); - } - - queries = { - "SELECT * FROM t2 WHERE a = 20 AND b = 200" - }; - - for (const sstring& q : queries) { - BOOST_CHECK_THROW(e.execute_cql(q).get(), exceptions::invalid_request_exception); - e.execute_cql(q + " ALLOW FILTERING").get(); - } - }); -} - -SEASTAR_TEST_CASE(test_allow_filtering_pk_ck) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY ((a, b), c, d));").get(); - e.require_table_exists("ks", "t").get(); - e.execute_cql("INSERT INTO t (a,b,c,d,e) VALUES (11, 12, 13, 14, 15)").get(); - e.execute_cql("INSERT INTO t (a,b,c,d,e) VALUES (11, 15, 16, 17, 18)").get(); - e.execute_cql("INSERT INTO t (a,b,c,d,e) VALUES (21, 22, 23, 24, 25)").get(); - e.execute_cql("INSERT INTO t (a,b,c,d,e) VALUES (31, 32, 33, 34, 35)").get(); - - auto msg = e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 15 AND c = 16").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(11), - int32_type->decompose(15), - int32_type->decompose(16), - int32_type->decompose(17), - int32_type->decompose(18), - }}); - - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 12 AND c > 13 AND d = 14").get(), exceptions::invalid_request_exception); - - msg = e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 15 AND c = 16").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(11), - int32_type->decompose(15), - int32_type->decompose(16), - int32_type->decompose(17), - int32_type->decompose(18), - }}); - - msg = e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 15 AND c > 13 AND d >= 17 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(11), - int32_type->decompose(15), - int32_type->decompose(16), - int32_type->decompose(17), - int32_type->decompose(18), - }}); - - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 12 AND c > 13 AND d > 17").get(), exceptions::invalid_request_exception); - - msg = e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 15 AND c > 13 AND d >= 17 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(11), - int32_type->decompose(15), - int32_type->decompose(16), - int32_type->decompose(17), - int32_type->decompose(18), - }}); - - msg = e.execute_cql("SELECT * FROM t WHERE a <= 11 AND c > 15 AND d >= 16 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(11), - int32_type->decompose(15), - int32_type->decompose(16), - int32_type->decompose(17), - int32_type->decompose(18), - }}); - - msg = e.execute_cql("SELECT * FROM t WHERE a <= 11 AND b >= 15 AND c > 15 AND d >= 16 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(11), - int32_type->decompose(15), - int32_type->decompose(16), - int32_type->decompose(17), - int32_type->decompose(18), - }}); - - msg = e.execute_cql("SELECT * FROM t WHERE a <= 100 AND b >= 15 AND c > 0 AND d <= 100 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(11), - int32_type->decompose(15), - int32_type->decompose(16), - int32_type->decompose(17), - int32_type->decompose(18), - }, - { - int32_type->decompose(31), - int32_type->decompose(32), - int32_type->decompose(33), - int32_type->decompose(34), - int32_type->decompose(35), - }, - { - int32_type->decompose(21), - int32_type->decompose(22), - int32_type->decompose(23), - int32_type->decompose(24), - int32_type->decompose(25), - } - }); - - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE a <= 11 AND c > 15 AND d >= 16").get(), exceptions::invalid_request_exception); - }); -} - -SEASTAR_TEST_CASE(test_allow_filtering_clustering_column) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE t (k int, c int, v int, PRIMARY KEY (k, c));").get(); - e.require_table_exists("ks", "t").get(); - - e.execute_cql("INSERT INTO t (k, c, v) VALUES (1, 2, 1)").get(); - e.execute_cql("INSERT INTO t (k, c, v) VALUES (1, 3, 2)").get(); - e.execute_cql("INSERT INTO t (k, c, v) VALUES (2, 2, 3)").get(); - - auto msg = e.execute_cql("SELECT * FROM t WHERE k = 1").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(1) - }, - { - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(2) - } - }); - - msg = e.execute_cql("SELECT * FROM t WHERE k = 1 AND c > 2").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(2) - }}); - - msg = e.execute_cql("SELECT * FROM t WHERE k = 1 AND c = 2").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(1) - }}); - - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE c = 2").get(), exceptions::invalid_request_exception); - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE c > 2 AND c <= 4").get(), exceptions::invalid_request_exception); - - msg = e.execute_cql("SELECT * FROM t WHERE c = 2 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(1) - }, - { - int32_type->decompose(2), - int32_type->decompose(2), - int32_type->decompose(3) - } - }); - - msg = e.execute_cql("SELECT * FROM t WHERE c > 2 AND c <= 4 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(2) - }}); - }); -} - -SEASTAR_TEST_CASE(test_allow_filtering_static_column) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE t (a int, b int, c int, s int static, PRIMARY KEY(a, b));").get(); - e.require_table_exists("ks", "t").get(); - e.execute_cql("CREATE INDEX ON t(c)").get(); - - e.execute_cql("INSERT INTO t (a, b, c, s) VALUES (1, 1, 1, 1)").get(); - e.execute_cql("INSERT INTO t (a, b, c) VALUES (1, 2, 1)").get(); - e.execute_cql("INSERT INTO t (a, s) VALUES (3, 3)").get(); - e.execute_cql("INSERT INTO t (a, b, c, s) VALUES (2, 1, 1, 2)").get(); - - eventually([&] { - auto msg = e.execute_cql("SELECT * FROM t WHERE c = 1 AND s = 2 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(2), - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(1) - }}); - }); - - eventually([&] { - auto msg = e.execute_cql("SELECT * FROM t WHERE c = 1 AND s = 1 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(1) - }, - { - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(1), - int32_type->decompose(1) - } - }); - }); - }); -} - -SEASTAR_TEST_CASE(test_allow_filtering_multiple_regular) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE t (a int, b int, c int, d int, e int, f list, g set, PRIMARY KEY(a, b));").get(); - e.require_table_exists("ks", "t").get(); - - e.execute_cql("INSERT INTO t (a, b, c, d, e, f, g) VALUES (1, 1, 1, 1, 1, [1], {})").get(); - e.execute_cql("INSERT INTO t (a, b, c, d, e, f, g) VALUES (1, 2, 3, 4, 5, [1, 2], {1, 2, 3})").get(); - e.execute_cql("INSERT INTO t (a, b, c, d, e, f, g) VALUES (1, 3, 5, 1, 9, [1, 2, 3], {1, 2})").get(); - e.execute_cql("INSERT INTO t (a, b, c, d, e, f, g) VALUES (1, 4, 5, 7, 5, [], {1})").get(); - - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE c = 5").get(), exceptions::invalid_request_exception); - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE d = 1").get(), exceptions::invalid_request_exception); - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE e = 5").get(), exceptions::invalid_request_exception); - - // Collection filtering queries are not supported yet - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE f contains 2").get(), exceptions::invalid_request_exception); - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE g contains 1").get(), exceptions::invalid_request_exception); - - auto msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 3 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(3), - int32_type->decompose(4), - int32_type->decompose(5) - }}); - - msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE e >= 5 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(3), - int32_type->decompose(4), - int32_type->decompose(5) - }, - { - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(5), - int32_type->decompose(1), - int32_type->decompose(9) - }, - { - int32_type->decompose(1), - int32_type->decompose(4), - int32_type->decompose(5), - int32_type->decompose(7), - int32_type->decompose(5) - } - }); - - msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 5 and e = 9 and d = 1 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(5), - int32_type->decompose(1), - int32_type->decompose(9) - }}); - - cql3::prepared_cache_key_type prepared_id = e.prepare("SELECT a, b, c, d, e FROM t WHERE a = ? and d = ? ALLOW FILTERING").get0(); - std::vector raw_values { - cql3::raw_value::make_value(int32_type->decompose(1)), - cql3::raw_value::make_value(int32_type->decompose(1)) - }; - msg = e.execute_prepared(prepared_id, raw_values).get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(1) - }, - { - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(5), - int32_type->decompose(1), - int32_type->decompose(9) - } - }); - - prepared_id = e.prepare("SELECT a, b, c, d, e FROM t WHERE a = ? and d = ? ALLOW FILTERING").get0(); - raw_values[1] = cql3::raw_value::make_value(int32_type->decompose(9)); - msg = e.execute_prepared(prepared_id, raw_values).get0(); - assert_that(msg).is_rows().with_size(0); - - - }); -} - -SEASTAR_TEST_CASE(test_allow_filtering_desc) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY((a, b), c, d)) WITH CLUSTERING ORDER BY (c DESC);").get(); - e.require_table_exists("ks", "t").get(); - - e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 1, 1, 1)").get(); - e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 3, 4, 5)").get(); - e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 5, 1, 9)").get(); - e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 6, 7, 5)").get(); - - auto msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c > 3 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(6), - int32_type->decompose(7), - int32_type->decompose(5) - }, - { - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(5), - int32_type->decompose(1), - int32_type->decompose(9) - } - }); - - msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c < 4 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(3), - int32_type->decompose(4), - int32_type->decompose(5) - }, - { - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(1) - } - }); - - msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 4 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_size(0); - }); -} - -SEASTAR_TEST_CASE(test_allow_filtering_with_secondary_index) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY(a, b));").get(); - e.require_table_exists("ks", "t").get(); - e.execute_cql("CREATE INDEX ON t(c)").get(); - - e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 1, 1, 1, 1)").get(); - e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 3, 4, 5)").get(); - e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 3, 5, 1, 9)").get(); - e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 4, 5, 7, 5)").get(); - - auto msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 3").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(3), - int32_type->decompose(4), - int32_type->decompose(5) - }}); - - BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE c = 5 and d = 1").get(), exceptions::invalid_request_exception); - - msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 5 and d = 1 ALLOW FILTERING").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(5), - int32_type->decompose(1), - int32_type->decompose(9) - }}); - - e.execute_cql("CREATE TABLE t2 (pk1 int, pk2 int, c1 int, c2 int, v int, PRIMARY KEY ((pk1, pk2), c1, c2));").get(); - e.execute_cql("CREATE INDEX ON t2(v)").get(); - for (int i = 1; i <= 5; ++i) { - for (int j = 1; j <= 2; ++j) { - e.execute_cql(sprint("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES (%s, %s, %s, %s, %s)", j, 1, 1, 1, i)).get(); - e.execute_cql(sprint("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES (%s, %s, %s, %s, %s)", j, 1, 1, i, i)).get(); - e.execute_cql(sprint("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES (%s, %s, %s, %s, %s)", j, 1, i, i, i)).get(); - e.execute_cql(sprint("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES (%s, %s, %s, %s, %s)", j, i, i, i, i)).get(); - } - } - - eventually([&] { - auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c1 > 0 AND c1 < 5 AND c2 = 1 AND v = 3 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_rows({}); - }); - - eventually([&] { - auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c1 > 0 AND c1 < 5 AND c2 = 3 AND v = 3 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(3), - int32_type->decompose(3), - int32_type->decompose(3) - }, - { - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(3) - }, - { - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(3), - int32_type->decompose(3) - } - }); - }); - - eventually([&] { - auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c2 > 1 AND c2 < 5 AND v = 1 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_rows({}); - }); - - eventually([&] { - auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c1 > 1 AND c2 > 2 AND v = 3 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(3), - int32_type->decompose(3), - int32_type->decompose(3) - }, - { - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(3), - int32_type->decompose(3) - } - }); - }); - - eventually([&] { - auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND pk2 > 1 AND c2 > 2 AND v = 3 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_rows({{ - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(3), - int32_type->decompose(3), - int32_type->decompose(3) - }}); - }); - - eventually([&] { - auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 >= 2 AND pk2 <=3 AND c1 IN(0,1,2) AND c2 IN(0,1,2) AND v < 3 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_rows({ - { - int32_type->decompose(2), - int32_type->decompose(2), - int32_type->decompose(2), - int32_type->decompose(2), - int32_type->decompose(2) - }, - { - int32_type->decompose(2), - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(2) - }, - { - int32_type->decompose(2), - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(2), - int32_type->decompose(2) - } - }); - }); - }); -} - -SEASTAR_TEST_CASE(test_in_restriction_on_not_last_partition_key) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE t (a int,b int,c int,d int,PRIMARY KEY ((a, b), c));").get(); - e.require_table_exists("ks", "t").get(); - - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (1,1,1,100); ").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (1,1,2,200); ").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (1,1,3,300); ").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (1,2,1,300); ").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (1,3,1,1300);").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (1,3,2,1400);").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (2,3,2,1400);").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (2,1,2,1400);").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (2,1,3,1300);").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (2,2,3,1300);").get(); - e.execute_cql("INSERT INTO t (a,b,c,d) VALUES (3,1,3,1300);").get(); - - { - auto msg = e.execute_cql("SELECT * FROM t WHERE a IN (1,2) AND b IN (2,3) AND c>=2 AND c<=3;").get0(); - assert_that(msg).is_rows().with_rows_ignore_order({ - { - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(2), - int32_type->decompose(1400), - }, - { - int32_type->decompose(2), - int32_type->decompose(2), - int32_type->decompose(3), - int32_type->decompose(1300), - }, - { - int32_type->decompose(2), - int32_type->decompose(3), - int32_type->decompose(2), - int32_type->decompose(1400), - } - }); - } - { - auto msg = e.execute_cql("SELECT * FROM t WHERE a IN (1,3) AND b=1 AND c>=2 AND c<=3;").get0(); - assert_that(msg).is_rows().with_rows_ignore_order({ - { - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(2), - int32_type->decompose(200), - }, - { - int32_type->decompose(1), - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(300), - }, - { - int32_type->decompose(3), - int32_type->decompose(1), - int32_type->decompose(3), - int32_type->decompose(1300), - } - }); - } - }); -} - -SEASTAR_TEST_CASE(test_static_multi_cell_static_lists_with_ckey) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE t (p int, c int, slist list static, v int, PRIMARY KEY (p, c));").get(); - e.execute_cql("INSERT INTO t (p, c, slist, v) VALUES (1, 1, [1], 1); ").get(); - - { - e.execute_cql("UPDATE t SET slist[0] = 3, v = 3 WHERE p = 1 AND c = 1;").get(); - auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0(); - auto slist_type = list_type_impl::get_instance(int32_type, true); - assert_that(msg).is_rows().with_row({ - { slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{3}}))) }, - { int32_type->decompose(3) } - }); - } - { - e.execute_cql("UPDATE t SET slist = [4], v = 4 WHERE p = 1 AND c = 1;").get(); - auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0(); - auto slist_type = list_type_impl::get_instance(int32_type, true); - assert_that(msg).is_rows().with_row({ - { slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{4}}))) }, - { int32_type->decompose(4) } - }); - } - { - e.execute_cql("UPDATE t SET slist = [3] + slist , v = 5 WHERE p = 1 AND c = 1;").get(); - auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0(); - auto slist_type = list_type_impl::get_instance(int32_type, true); - assert_that(msg).is_rows().with_row({ - { slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) }, - { int32_type->decompose(5) } - }); - } - { - e.execute_cql("UPDATE t SET slist = slist + [5] , v = 6 WHERE p = 1 AND c = 1;").get(); - auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0(); - auto slist_type = list_type_impl::get_instance(int32_type, true); - assert_that(msg).is_rows().with_row({ - { slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4, 5}))) }, - { int32_type->decompose(6) } - }); - } - { - e.execute_cql("DELETE slist[2] from t WHERE p = 1;").get(); - auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0(); - auto slist_type = list_type_impl::get_instance(int32_type, true); - assert_that(msg).is_rows().with_row({ - { slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) }, - { int32_type->decompose(6) } - }); - } - { - e.execute_cql("UPDATE t SET slist = slist - [4] , v = 7 WHERE p = 1 AND c = 1;").get(); - auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0(); - auto slist_type = list_type_impl::get_instance(int32_type, true); - assert_that(msg).is_rows().with_row({ - { slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3}))) }, - { int32_type->decompose(7) } - }); - } - }); -} - /** * A class to represent a single multy-column slice expression. @@ -4044,81 +3373,3 @@ SEASTAR_TEST_CASE(test_select_with_mixed_order_table) { } }); } - -SEASTAR_TEST_CASE(test_filtering) { - return do_with_cql_env_thread([] (cql_test_env& e) { - e.execute_cql("CREATE TABLE cf (k int, v int,m int,n int,o int,p int static, PRIMARY KEY ((k,v),m,n));").get(); - e.execute_cql( - "BEGIN UNLOGGED BATCH \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (1, 1, 1, 1, 1 ,1 ); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (2, 1, 2, 1, 2 ,2 ); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (3, 1, 3, 1, 3 ,3 ); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (4, 2, 1, 2, 4 ,4 ); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (5, 2, 2, 2, 5 ,5 ); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (6, 2, 3, 2, 6 ,6 ); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (7, 3, 1, 3, 7 ,7 ); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (8, 3, 2, 3, 8 ,8 ); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (9, 3, 3, 3, 9 ,9 ); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (10, 4, 1, 4,10,10); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (11, 4, 2, 4,11,11); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (12, 5, 3, 5,12,12); \n" - "INSERT INTO cf (k, v, m, n, o, p) VALUES (12, 5, 4, 5,13,13); \n" - "APPLY BATCH;" - ).get(); - - // Notice the with_serialized_columns_count() check before the set comparison. - // Since we are dealing with the result set before serializing to the client, - // there is an extra column that is used for the filtering, this column will - // not be present in the responce to the client and with_serialized_columns_count() - // verifies exactly that. - - // test filtering on partition keys - { - auto msg = e.execute_cql("SELECT k FROM cf WHERE v=3 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_serialized_columns_count(1).with_rows_ignore_order({ - { int32_type->decompose(7), int32_type->decompose(3)}, - { int32_type->decompose(8), int32_type->decompose(3) }, - { int32_type->decompose(9), int32_type->decompose(3) }, - }); - } - - // test filtering on clustering keys - { - auto msg = e.execute_cql("SELECT k FROM cf WHERE n=4 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_serialized_columns_count(1).with_rows_ignore_order({ - { int32_type->decompose(10), int32_type->decompose(4) }, - { int32_type->decompose(11), int32_type->decompose(4) }, - }); - } - - //test filtering on regular columns - { - auto msg = e.execute_cql("SELECT k FROM cf WHERE o>7 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_serialized_columns_count(1).with_rows_ignore_order({ - { int32_type->decompose(8), int32_type->decompose(8) }, - { int32_type->decompose(9), int32_type->decompose(9) }, - { int32_type->decompose(10), int32_type->decompose(10) }, - { int32_type->decompose(11), int32_type->decompose(11) }, - { int32_type->decompose(12), int32_type->decompose(12) }, - { int32_type->decompose(12), int32_type->decompose(13) }, - }); - } - - //test filtering on static columns - { - auto msg = e.execute_cql("SELECT k FROM cf WHERE p>=10 AND p<=12 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_serialized_columns_count(1).with_rows_ignore_order({ - { int32_type->decompose(10), int32_type->decompose(10) }, - { int32_type->decompose(11), int32_type->decompose(11) }, - }); - } - //test filtering with count - { - auto msg = e.execute_cql("SELECT COUNT(k) FROM cf WHERE n>3 ALLOW FILTERING;").get0(); - assert_that(msg).is_rows().with_serialized_columns_count(1).with_size(1).with_rows_ignore_order({ - { long_type->decompose(4L), int32_type->decompose(4) }, - }); - } - - }); -} diff --git a/tests/filtering_test.cc b/tests/filtering_test.cc new file mode 100644 index 0000000000..33269207eb --- /dev/null +++ b/tests/filtering_test.cc @@ -0,0 +1,774 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + +#include +#include +#include +#include +#include + +#include + +#include "tests/test-utils.hh" +#include "tests/cql_test_env.hh" +#include "tests/cql_assertions.hh" + +#include "seastar/core/future-util.hh" +#include "seastar/core/sleep.hh" +#include "transport/messages/result_message.hh" +#include "utils/big_decimal.hh" + +using namespace std::literals::chrono_literals; + + +SEASTAR_TEST_CASE(test_allow_filtering_check) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE t (p int, c int, v int, PRIMARY KEY(p, c));").get(); + e.require_table_exists("ks", "t").get(); + + for (int i = 0; i < 3; ++i) { + for (int j = 0; j <3; ++j) { + e.execute_cql(format("INSERT INTO t(p, c, v) VALUES ({}, {}, {})", i, j, j)).get(); + } + } + + std::vector queries = { + "SELECT * FROM t WHERE p = 1", + "SELECT * FROM t WHERE p = 1 and c > 2", + "SELECT * FROM t WHERE p = 1 and c = 2" + }; + + for (const sstring& q : queries) { + e.execute_cql(q).get(); + e.execute_cql(q + " ALLOW FILTERING").get(); + } + + queries = { + "SELECT * FROM t WHERE c = 2", + "SELECT * FROM t WHERE c <= 4" + }; + + for (const sstring& q : queries) { + BOOST_CHECK_THROW(e.execute_cql(q).get(), exceptions::invalid_request_exception); + e.execute_cql(q + " ALLOW FILTERING").get(); + } + + e.execute_cql("CREATE TABLE t2 (p int PRIMARY KEY, a int, b int);").get(); + e.require_table_exists("ks", "t2").get(); + e.execute_cql("CREATE INDEX ON t2(a)").get(); + for (int i = 0; i < 5; ++i) { + e.execute_cql(format("INSERT INTO t2 (p, a, b) VALUES ({}, {}, {})", i, i * 10, i * 100)).get(); + } + + queries = { + "SELECT * FROM t2 WHERE p = 1", + "SELECT * FROM t2 WHERE a = 20" + }; + + for (const sstring& q : queries) { + e.execute_cql(q).get(); + e.execute_cql(q + " ALLOW FILTERING").get(); + } + + queries = { + "SELECT * FROM t2 WHERE a = 20 AND b = 200" + }; + + for (const sstring& q : queries) { + BOOST_CHECK_THROW(e.execute_cql(q).get(), exceptions::invalid_request_exception); + e.execute_cql(q + " ALLOW FILTERING").get(); + } + }); +} + +SEASTAR_TEST_CASE(test_allow_filtering_pk_ck) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY ((a, b), c, d));").get(); + e.require_table_exists("ks", "t").get(); + e.execute_cql("INSERT INTO t (a,b,c,d,e) VALUES (11, 12, 13, 14, 15)").get(); + e.execute_cql("INSERT INTO t (a,b,c,d,e) VALUES (11, 15, 16, 17, 18)").get(); + e.execute_cql("INSERT INTO t (a,b,c,d,e) VALUES (21, 22, 23, 24, 25)").get(); + e.execute_cql("INSERT INTO t (a,b,c,d,e) VALUES (31, 32, 33, 34, 35)").get(); + + auto msg = e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 15 AND c = 16").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(11), + int32_type->decompose(15), + int32_type->decompose(16), + int32_type->decompose(17), + int32_type->decompose(18), + }}); + + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 12 AND c > 13 AND d = 14").get(), exceptions::invalid_request_exception); + + msg = e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 15 AND c = 16").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(11), + int32_type->decompose(15), + int32_type->decompose(16), + int32_type->decompose(17), + int32_type->decompose(18), + }}); + + msg = e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 15 AND c > 13 AND d >= 17 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(11), + int32_type->decompose(15), + int32_type->decompose(16), + int32_type->decompose(17), + int32_type->decompose(18), + }}); + + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 12 AND c > 13 AND d > 17").get(), exceptions::invalid_request_exception); + + msg = e.execute_cql("SELECT * FROM t WHERE a = 11 AND b = 15 AND c > 13 AND d >= 17 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(11), + int32_type->decompose(15), + int32_type->decompose(16), + int32_type->decompose(17), + int32_type->decompose(18), + }}); + + msg = e.execute_cql("SELECT * FROM t WHERE a <= 11 AND c > 15 AND d >= 16 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(11), + int32_type->decompose(15), + int32_type->decompose(16), + int32_type->decompose(17), + int32_type->decompose(18), + }}); + + msg = e.execute_cql("SELECT * FROM t WHERE a <= 11 AND b >= 15 AND c > 15 AND d >= 16 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(11), + int32_type->decompose(15), + int32_type->decompose(16), + int32_type->decompose(17), + int32_type->decompose(18), + }}); + + msg = e.execute_cql("SELECT * FROM t WHERE a <= 100 AND b >= 15 AND c > 0 AND d <= 100 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(11), + int32_type->decompose(15), + int32_type->decompose(16), + int32_type->decompose(17), + int32_type->decompose(18), + }, + { + int32_type->decompose(31), + int32_type->decompose(32), + int32_type->decompose(33), + int32_type->decompose(34), + int32_type->decompose(35), + }, + { + int32_type->decompose(21), + int32_type->decompose(22), + int32_type->decompose(23), + int32_type->decompose(24), + int32_type->decompose(25), + } + }); + + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE a <= 11 AND c > 15 AND d >= 16").get(), exceptions::invalid_request_exception); + }); +} + +SEASTAR_TEST_CASE(test_allow_filtering_clustering_column) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE t (k int, c int, v int, PRIMARY KEY (k, c));").get(); + e.require_table_exists("ks", "t").get(); + + e.execute_cql("INSERT INTO t (k, c, v) VALUES (1, 2, 1)").get(); + e.execute_cql("INSERT INTO t (k, c, v) VALUES (1, 3, 2)").get(); + e.execute_cql("INSERT INTO t (k, c, v) VALUES (2, 2, 3)").get(); + + auto msg = e.execute_cql("SELECT * FROM t WHERE k = 1").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(1) + }, + { + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(2) + } + }); + + msg = e.execute_cql("SELECT * FROM t WHERE k = 1 AND c > 2").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(2) + }}); + + msg = e.execute_cql("SELECT * FROM t WHERE k = 1 AND c = 2").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(1) + }}); + + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE c = 2").get(), exceptions::invalid_request_exception); + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE c > 2 AND c <= 4").get(), exceptions::invalid_request_exception); + + msg = e.execute_cql("SELECT * FROM t WHERE c = 2 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(1) + }, + { + int32_type->decompose(2), + int32_type->decompose(2), + int32_type->decompose(3) + } + }); + + msg = e.execute_cql("SELECT * FROM t WHERE c > 2 AND c <= 4 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(2) + }}); + }); +} + +SEASTAR_TEST_CASE(test_allow_filtering_static_column) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE t (a int, b int, c int, s int static, PRIMARY KEY(a, b));").get(); + e.require_table_exists("ks", "t").get(); + e.execute_cql("CREATE INDEX ON t(c)").get(); + + e.execute_cql("INSERT INTO t (a, b, c, s) VALUES (1, 1, 1, 1)").get(); + e.execute_cql("INSERT INTO t (a, b, c) VALUES (1, 2, 1)").get(); + e.execute_cql("INSERT INTO t (a, s) VALUES (3, 3)").get(); + e.execute_cql("INSERT INTO t (a, b, c, s) VALUES (2, 1, 1, 2)").get(); + + eventually([&] { + auto msg = e.execute_cql("SELECT * FROM t WHERE c = 1 AND s = 2 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(2), + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(1) + }}); + }); + + eventually([&] { + auto msg = e.execute_cql("SELECT * FROM t WHERE c = 1 AND s = 1 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(1) + }, + { + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(1), + int32_type->decompose(1) + } + }); + }); + }); +} + +SEASTAR_TEST_CASE(test_allow_filtering_multiple_regular) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE t (a int, b int, c int, d int, e int, f list, g set, PRIMARY KEY(a, b));").get(); + e.require_table_exists("ks", "t").get(); + + e.execute_cql("INSERT INTO t (a, b, c, d, e, f, g) VALUES (1, 1, 1, 1, 1, [1], {})").get(); + e.execute_cql("INSERT INTO t (a, b, c, d, e, f, g) VALUES (1, 2, 3, 4, 5, [1, 2], {1, 2, 3})").get(); + e.execute_cql("INSERT INTO t (a, b, c, d, e, f, g) VALUES (1, 3, 5, 1, 9, [1, 2, 3], {1, 2})").get(); + e.execute_cql("INSERT INTO t (a, b, c, d, e, f, g) VALUES (1, 4, 5, 7, 5, [], {1})").get(); + + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE c = 5").get(), exceptions::invalid_request_exception); + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE d = 1").get(), exceptions::invalid_request_exception); + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE e = 5").get(), exceptions::invalid_request_exception); + + // Collection filtering queries are not supported yet + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE f contains 2").get(), exceptions::invalid_request_exception); + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE g contains 1").get(), exceptions::invalid_request_exception); + + auto msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 3 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(3), + int32_type->decompose(4), + int32_type->decompose(5) + }}); + + msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE e >= 5 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(3), + int32_type->decompose(4), + int32_type->decompose(5) + }, + { + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(5), + int32_type->decompose(1), + int32_type->decompose(9) + }, + { + int32_type->decompose(1), + int32_type->decompose(4), + int32_type->decompose(5), + int32_type->decompose(7), + int32_type->decompose(5) + } + }); + + msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 5 and e = 9 and d = 1 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(5), + int32_type->decompose(1), + int32_type->decompose(9) + }}); + + cql3::prepared_cache_key_type prepared_id = e.prepare("SELECT a, b, c, d, e FROM t WHERE a = ? and d = ? ALLOW FILTERING").get0(); + std::vector raw_values { + cql3::raw_value::make_value(int32_type->decompose(1)), + cql3::raw_value::make_value(int32_type->decompose(1)) + }; + msg = e.execute_prepared(prepared_id, raw_values).get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(1) + }, + { + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(5), + int32_type->decompose(1), + int32_type->decompose(9) + } + }); + + prepared_id = e.prepare("SELECT a, b, c, d, e FROM t WHERE a = ? and d = ? ALLOW FILTERING").get0(); + raw_values[1] = cql3::raw_value::make_value(int32_type->decompose(9)); + msg = e.execute_prepared(prepared_id, raw_values).get0(); + assert_that(msg).is_rows().with_size(0); + + + }); +} + +SEASTAR_TEST_CASE(test_allow_filtering_desc) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY((a, b), c, d)) WITH CLUSTERING ORDER BY (c DESC);").get(); + e.require_table_exists("ks", "t").get(); + + e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 1, 1, 1)").get(); + e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 3, 4, 5)").get(); + e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 5, 1, 9)").get(); + e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 6, 7, 5)").get(); + + auto msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c > 3 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(6), + int32_type->decompose(7), + int32_type->decompose(5) + }, + { + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(5), + int32_type->decompose(1), + int32_type->decompose(9) + } + }); + + msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c < 4 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(3), + int32_type->decompose(4), + int32_type->decompose(5) + }, + { + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(1) + } + }); + + msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 4 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_size(0); + }); +} + +SEASTAR_TEST_CASE(test_allow_filtering_with_secondary_index) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY(a, b));").get(); + e.require_table_exists("ks", "t").get(); + e.execute_cql("CREATE INDEX ON t(c)").get(); + + e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 1, 1, 1, 1)").get(); + e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 2, 3, 4, 5)").get(); + e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 3, 5, 1, 9)").get(); + e.execute_cql("INSERT INTO t (a, b, c, d, e) VALUES (1, 4, 5, 7, 5)").get(); + + auto msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 3").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(3), + int32_type->decompose(4), + int32_type->decompose(5) + }}); + + BOOST_CHECK_THROW(e.execute_cql("SELECT * FROM t WHERE c = 5 and d = 1").get(), exceptions::invalid_request_exception); + + msg = e.execute_cql("SELECT a, b, c, d, e FROM t WHERE c = 5 and d = 1 ALLOW FILTERING").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(5), + int32_type->decompose(1), + int32_type->decompose(9) + }}); + + e.execute_cql("CREATE TABLE t2 (pk1 int, pk2 int, c1 int, c2 int, v int, PRIMARY KEY ((pk1, pk2), c1, c2));").get(); + e.execute_cql("CREATE INDEX ON t2(v)").get(); + for (int i = 1; i <= 5; ++i) { + for (int j = 1; j <= 2; ++j) { + e.execute_cql(format("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES ({}, {}, {}, {}, {})", j, 1, 1, 1, i)).get(); + e.execute_cql(format("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES ({}, {}, {}, {}, {})", j, 1, 1, i, i)).get(); + e.execute_cql(format("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES ({}, {}, {}, {}, {})", j, 1, i, i, i)).get(); + e.execute_cql(format("INSERT INTO t2 (pk1, pk2, c1, c2, v) VALUES ({}, {}, {}, {}, {})", j, i, i, i, i)).get(); + } + } + + eventually([&] { + auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c1 > 0 AND c1 < 5 AND c2 = 1 AND v = 3 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_rows({}); + }); + + eventually([&] { + auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c1 > 0 AND c1 < 5 AND c2 = 3 AND v = 3 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(3), + int32_type->decompose(3), + int32_type->decompose(3) + }, + { + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(3) + }, + { + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(3), + int32_type->decompose(3) + } + }); + }); + + eventually([&] { + auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c2 > 1 AND c2 < 5 AND v = 1 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_rows({}); + }); + + eventually([&] { + auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND c1 > 1 AND c2 > 2 AND v = 3 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(3), + int32_type->decompose(3), + int32_type->decompose(3) + }, + { + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(3), + int32_type->decompose(3) + } + }); + }); + + eventually([&] { + auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 = 1 AND pk2 > 1 AND c2 > 2 AND v = 3 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_rows({{ + int32_type->decompose(1), + int32_type->decompose(3), + int32_type->decompose(3), + int32_type->decompose(3), + int32_type->decompose(3) + }}); + }); + + eventually([&] { + auto msg = e.execute_cql("SELECT * FROM t2 WHERE pk1 >= 2 AND pk2 <=3 AND c1 IN(0,1,2) AND c2 IN(0,1,2) AND v < 3 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_rows({ + { + int32_type->decompose(2), + int32_type->decompose(2), + int32_type->decompose(2), + int32_type->decompose(2), + int32_type->decompose(2) + }, + { + int32_type->decompose(2), + int32_type->decompose(1), + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(2) + }, + { + int32_type->decompose(2), + int32_type->decompose(1), + int32_type->decompose(2), + int32_type->decompose(2), + int32_type->decompose(2) + } + }); + }); + }); +} + +SEASTAR_TEST_CASE(test_allow_filtering_limit) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE timeline (user text, c int, liked boolean, PRIMARY KEY (user, c));").get(); + e.execute_cql( + "BEGIN UNLOGGED BATCH \n" + "insert INTO timeline (user, c, liked) VALUES ('a',1,false); \n" + "insert INTO timeline (user, c, liked) VALUES ('a',2,false); \n" + "insert INTO timeline (user, c, liked) VALUES ('a',3,true); \n" + "insert INTO timeline (user, c, liked) VALUES ('a',4,false); \n" + "insert INTO timeline (user, c, liked) VALUES ('a',5,false); \n" + "insert INTO timeline (user, c, liked) VALUES ('a',6,false); \n" + "APPLY BATCH;" + ).get(); + + auto msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_rows_ignore_order({ + { int32_type->decompose(3), boolean_type->decompose(true)}, + }); + + msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_rows_ignore_order({ + { int32_type->decompose(1), boolean_type->decompose(false)}, + { int32_type->decompose(2), boolean_type->decompose(false)}, + { int32_type->decompose(4), boolean_type->decompose(false)}, + { int32_type->decompose(5), boolean_type->decompose(false)}, + { int32_type->decompose(6), boolean_type->decompose(false)}, + }); + + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); + msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0(); + assert_that(msg).is_rows().with_rows_ignore_order({ + { int32_type->decompose(3), boolean_type->decompose(true)}, + }); + + qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); + msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 5 ALLOW FILTERING;", std::move(qo)).get0(); + assert_that(msg).is_rows().with_rows_ignore_order({ + { int32_type->decompose(1), boolean_type->decompose(false)}, + { int32_type->decompose(2), boolean_type->decompose(false)}, + { int32_type->decompose(4), boolean_type->decompose(false)}, + { int32_type->decompose(5), boolean_type->decompose(false)}, + { int32_type->decompose(6), boolean_type->decompose(false)}, + }); + + qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); + msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 2 ALLOW FILTERING;", std::move(qo)).get0(); + assert_that(msg).is_rows().with_rows_ignore_order({ + { int32_type->decompose(1), boolean_type->decompose(false)}, + { int32_type->decompose(2), boolean_type->decompose(false)} + }); + + qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); + msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); + assert_that(msg).is_rows().with_rows_ignore_order({ + { int32_type->decompose(1), boolean_type->decompose(false)}, + { int32_type->decompose(2), boolean_type->decompose(false)}, + { int32_type->decompose(4), boolean_type->decompose(false)} + }); + + auto extract_paging_state = [] (::shared_ptr res) { + auto rows = dynamic_pointer_cast(res); + auto paging_state = rows->rs().get_metadata().paging_state(); + assert(paging_state); + return ::make_shared(*paging_state); + }; + + auto count_rows_fetched = [] (::shared_ptr res) { + auto rows = dynamic_pointer_cast(res); + return rows->rs().result_set().size(); + }; + + qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); + msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); + auto paging_state = extract_paging_state(msg); + assert_that(msg).is_rows().with_rows_ignore_order({ + { int32_type->decompose(1), boolean_type->decompose(false)} + }); + + // Some pages might be empty and in such case we should continue querying + size_t rows_fetched = 0; + while (rows_fetched == 0) { + qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); + msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); + rows_fetched = count_rows_fetched(msg); + paging_state = extract_paging_state(msg); + } + assert_that(msg).is_rows().with_rows_ignore_order({ + { int32_type->decompose(2), boolean_type->decompose(false)} + }); + + rows_fetched = 0; + while (rows_fetched == 0) { + qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); + msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); + rows_fetched = count_rows_fetched(msg); + if (rows_fetched == 0) { + paging_state = extract_paging_state(msg); + } + } + assert_that(msg).is_rows().with_rows_ignore_order({ + { int32_type->decompose(4), boolean_type->decompose(false)} + }); + }); +} + +SEASTAR_TEST_CASE(test_filtering) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE cf (k int, v int,m int,n int,o int,p int static, PRIMARY KEY ((k,v),m,n));").get(); + e.execute_cql( + "BEGIN UNLOGGED BATCH \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (1, 1, 1, 1, 1 ,1 ); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (2, 1, 2, 1, 2 ,2 ); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (3, 1, 3, 1, 3 ,3 ); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (4, 2, 1, 2, 4 ,4 ); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (5, 2, 2, 2, 5 ,5 ); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (6, 2, 3, 2, 6 ,6 ); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (7, 3, 1, 3, 7 ,7 ); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (8, 3, 2, 3, 8 ,8 ); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (9, 3, 3, 3, 9 ,9 ); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (10, 4, 1, 4,10,10); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (11, 4, 2, 4,11,11); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (12, 5, 3, 5,12,12); \n" + "INSERT INTO cf (k, v, m, n, o, p) VALUES (12, 5, 4, 5,13,13); \n" + "APPLY BATCH;" + ).get(); + + // Notice the with_serialized_columns_count() check before the set comparison. + // Since we are dealing with the result set before serializing to the client, + // there is an extra column that is used for the filtering, this column will + // not be present in the responce to the client and with_serialized_columns_count() + // verifies exactly that. + + // test filtering on partition keys + { + auto msg = e.execute_cql("SELECT k FROM cf WHERE v=3 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_serialized_columns_count(1).with_rows_ignore_order({ + { int32_type->decompose(7), int32_type->decompose(3)}, + { int32_type->decompose(8), int32_type->decompose(3) }, + { int32_type->decompose(9), int32_type->decompose(3) }, + }); + } + + // test filtering on clustering keys + { + auto msg = e.execute_cql("SELECT k FROM cf WHERE n=4 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_serialized_columns_count(1).with_rows_ignore_order({ + { int32_type->decompose(10), int32_type->decompose(4) }, + { int32_type->decompose(11), int32_type->decompose(4) }, + }); + } + + //test filtering on regular columns + { + auto msg = e.execute_cql("SELECT k FROM cf WHERE o>7 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_serialized_columns_count(1).with_rows_ignore_order({ + { int32_type->decompose(8), int32_type->decompose(8) }, + { int32_type->decompose(9), int32_type->decompose(9) }, + { int32_type->decompose(10), int32_type->decompose(10) }, + { int32_type->decompose(11), int32_type->decompose(11) }, + { int32_type->decompose(12), int32_type->decompose(12) }, + { int32_type->decompose(12), int32_type->decompose(13) }, + }); + } + + //test filtering on static columns + { + auto msg = e.execute_cql("SELECT k FROM cf WHERE p>=10 AND p<=12 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_serialized_columns_count(1).with_rows_ignore_order({ + { int32_type->decompose(10), int32_type->decompose(10) }, + { int32_type->decompose(11), int32_type->decompose(11) }, + }); + } + //test filtering with count + { + auto msg = e.execute_cql("SELECT COUNT(k) FROM cf WHERE n>3 ALLOW FILTERING;").get0(); + assert_that(msg).is_rows().with_serialized_columns_count(1).with_size(1).with_rows_ignore_order({ + { long_type->decompose(4L), int32_type->decompose(4) }, + }); + } + + }); +}