From 9535abf552ac25b796cb2c56581080d2859a791f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Tue, 18 Jun 2024 07:04:01 +0200 Subject: [PATCH 1/2] cql3/select_statement: do not parallelize single-partition aggregations Currently reads with WHERE clause which limits them to be single-partition reads, are unnecessarily parallelized. This commit checks this condition and the query doesn't use forward_service in single-partition reads. (cherry picked from commit e9ace7c2031523099de0b076b9678264ddd74e5c) --- cql3/statements/select_statement.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 7c90810000..06f02d1a36 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -2004,7 +2004,10 @@ std::unique_ptr select_statement::prepare(data_dictionary::d ) && !restrictions->need_filtering() // No filtering && group_by_cell_indices->empty() // No GROUP BY - && db.get_config().enable_parallelized_aggregation(); + && db.get_config().enable_parallelized_aggregation() + && !( // Do not parallelize the request if it's single partition read + restrictions->partition_key_restrictions_is_all_eq() + && restrictions->partition_key_restrictions_size() == schema->partition_key_size()); }; if (_parameters->is_prune_materialized_view()) { From bea1f4891df8130c63788fba65ee27bdf2d7d73b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Tue, 18 Jun 2024 07:08:53 +0200 Subject: [PATCH 2/2] test/boost/cql_query_test: add test for single-partition aggregation (cherry picked from commit 8eb5ca820289d3ebec42e3108f3706bd0156ef79) --- test/boost/cql_query_test.cc | 68 ++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 33068d68c4..a999d87718 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -26,6 +26,8 @@ #include #include #include "transport/messages/result_message.hh" +#include "transport/messages/result_message_base.hh" +#include "types/types.hh" #include "utils/big_decimal.hh" #include "types/user.hh" #include "types/map.hh" @@ -5361,6 +5363,72 @@ SEASTAR_TEST_CASE(test_parallelized_select_counter_type) { }); } +SEASTAR_TEST_CASE(test_single_partition_aggregation_is_not_parallelized) { + // It's pointless from performance pov to parallelize + // aggregation queries which reads only single partition. + + return with_parallelized_aggregation_enabled_thread([](cql_test_env& e) { + auto& qp = e.local_qp(); + const auto stat_parallelized = qp.get_cql_stats().select_parallelized; + + e.execute_cql("CREATE TABLE tbl (pk int, ck int, col int, PRIMARY KEY (pk, ck));").get(); + const int value_count = 10; + for (int pk = 0; pk < 2; pk++) { + for (int c = 0; c < value_count; c++) { + e.execute_cql(format("INSERT INTO tbl (pk, ck, col) VALUES ({:d}, {:d}, {:d});", pk, c, c)).get(); + } + } + + const auto result1 = e.execute_cql("SELECT COUNT(*) FROM tbl WHERE pk = 1;").get(); + assert_that(result1).is_rows().with_rows({ + {long_type->decompose(int64_t(value_count))} + }); + BOOST_CHECK_EQUAL(stat_parallelized, qp.get_cql_stats().select_parallelized); + + const auto result2 = e.execute_cql("SELECT COUNT(*) FROM tbl WHERE pk = 1 AND ck = 1;").get(); + assert_that(result2).is_rows().with_rows({ + {long_type->decompose(int64_t(1))} + }); + BOOST_CHECK_EQUAL(stat_parallelized, qp.get_cql_stats().select_parallelized); + + const auto result3 = e.execute_cql("SELECT COUNT(*) FROM tbl WHERE token(pk) = 1;").get(); + // We don't check value of count(*) here but only if it wasn't parallelized + BOOST_CHECK_EQUAL(stat_parallelized, qp.get_cql_stats().select_parallelized); + + const auto result4 = e.execute_cql("SELECT COUNT(*) FROM tbl WHERE pk = 1 AND pk = 2;").get(); + assert_that(result4).is_rows().with_rows({ + {long_type->decompose(int64_t(0))} + }); + BOOST_CHECK_EQUAL(stat_parallelized, qp.get_cql_stats().select_parallelized); + + + e.execute_cql("CREATE TABLE tbl2 (pk1 int, pk2 int, ck int, col int, PRIMARY KEY((pk1, pk2), ck));").get(); + for (int pk1 = 0; pk1 < 2; pk1++) { + for (int pk2 = 0; pk2 < 2; pk2++) { + for (int c = 0; c < value_count; c++) { + e.execute_cql(format("INSERT INTO tbl2 (pk1, pk2, ck, col) VALUES ({:d}, {:d}, {:d}, {:d});", pk1, pk2, c, c)).get(); + } + } + } + + const auto result_pk12 = e.execute_cql("SELECT COUNT(*) FROM tbl2 WHERE pk1 = 1 AND pk2 = 0;").get(); + assert_that(result_pk12).is_rows().with_rows({ + {long_type->decompose(int64_t(value_count))} + }); + BOOST_CHECK_EQUAL(stat_parallelized, qp.get_cql_stats().select_parallelized); + + // Query with only partly restricted partition key requires `ALLOW FILTERING` clause + // and we doesn't parallelize queries which need filtering. + // See issue #19369. + const auto result_pk1 = e.execute_cql("SELECT COUNT(*) FROM tbl2 WHERE pk1 = 1 ALLOW FILTERING;").get(); + // This query contains also column for pk1 + assert_that(result_pk1).is_rows().with_rows({ + {long_type->decompose(int64_t(value_count * 2)), int32_type->decompose(int32_t(1))} + }); + BOOST_CHECK_EQUAL(stat_parallelized, qp.get_cql_stats().select_parallelized); + }); +} + static future<> with_udf_and_parallel_aggregation_enabled_thread(std::function&& func) { auto db_cfg_ptr = make_shared(); auto& db_cfg = *db_cfg_ptr;