Files
scylladb/test/cqlpy/test_limit.py
Paweł Zakrzewski d483051e44 cql3/select_statement: reject aggregate functions when PER PARTITION LIMIT is present
Before this patch we silently allowed and ignored PER PARTITION LIMIT.
While using aggregate functions in conjunction with PER PARTITION LIMIT
can make sense, we want to disable it until we can offer proper
implementation, see #9879 for discussion.

We want to match Cassandra, and for queries with aggregate functions it
behaves as follows:
- it silently ignores PER PARTITION LIMIT if GROUP BY is present, which
  matches our previous implementation.
- rejects PER PARTITION LIMIT when GROUP BY is *not* present.

This patch adds rejection of the second group.

Fixes #9879

Closes scylladb/scylladb#23086
2025-03-13 10:29:53 +02:00

202 lines
11 KiB
Python

# Copyright 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#############################################################################
# Tests for SELECT's LIMIT feature
#
# Note that LIMIT can be combined with many other features: GROUP BY,
# DISTINCT, CLUSTERING ORDER, aggregations, secondary indexes, paging,
# and more. So some tests involving a combination of these features are
# in other test files, not here:
# * LIMIT with GROUP BY (with and without aggregation) is tested in test_group_by.py
# * LIMIT with ORDER BY is in test_clustering_order.py
# * LIMIT with SELECT DISTINCT is in test_distinct.py
# * LIMIT with filtering is in test_secondary_index.py. Those tests check
# that LIMIT should limit the number of post-filtering results, and run both
# with and *without* a secondary index.
#
# As usual, all tests in this suite are run on a single node, so if we have
# bugs in the way that a coordinator aggregates results from multiple
# replicas, they are not tested here.
#
# All tests currently in this file only verify correctness - not performance:
# The tests don't check if the LIMIT is implemented efficiently (the replica
# stops reading after LIMIT rows) or in some inefficient manner (e.g., 1MB of
# data is read and then the excess data is dropped). We can consider later
# how to test performance aspects - perhaps using metrics.
#############################################################################
import pytest
from .util import unique_key_int, new_test_table, new_function, new_aggregate
from cassandra.protocol import InvalidRequest
@pytest.fixture(scope="module")
def table1(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "p int, c int, PRIMARY KEY (p, c)") as table:
yield table
# A basic test for "LIMIT" combined with no other feature (no filtering, no
# paging, no group by, etc.). There are two scenarios to check - a full table
# scan with a LIMIT, and a single-partition scan with a LIMIT.
def test_limit_full_table_scan(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "p int primary key") as table:
stmt = cql.prepare(f'INSERT INTO {table} (p) VALUES (?)')
for i in range(10):
cql.execute(stmt, [i])
# We don't know how partitions get ordered, so let's scan the whole
# table to get the expected order
expected = list(cql.execute(f'SELECT p from {table}'))
for limit in [1, 3, 7]:
assert expected[0:limit] == list(cql.execute(f'SELECT p from {table} LIMIT {limit}'))
def test_limit_single_partition_scan(cql, table1):
p = unique_key_int()
stmt = cql.prepare(f'INSERT INTO {table1} (p, c) VALUES ({p}, ?)')
for i in range(10):
cql.execute(stmt, [i])
for limit in [1, 3, 7]:
assert list(range(limit)) == [x.c for x in cql.execute(f'SELECT c from {table1} WHERE p={p} LIMIT {limit}')]
# Check that *paging* is correctly supported with LIMIT - paging should continue
# until reaching LIMIT number of total results (and not forget how much we
# still need to read until the limit).
def test_limit_single_partition_scan_with_paging(cql, table1):
p = unique_key_int()
stmt = cql.prepare(f'INSERT INTO {table1} (p, c) VALUES ({p}, ?)')
for i in range(10):
cql.execute(stmt, [i])
# If we read with fetch_size=3 and LIMIT=7, we should read a first page
# with 3 results, and then two more pages, until a total of 7 results.
# If fetch_size=8 and LIMIT=7, we should read just 7 results in the
# first page.
limit = 7
s = cql.prepare(f'SELECT c from {table1} WHERE p={p} LIMIT {limit}')
for page_size in range(10):
s.fetch_size = page_size
results = cql.execute(s)
if page_size > 0:
assert len(results.current_rows) == min(page_size, limit)
else:
# page_size=0 means use some large default, so we expect all
# "limit" results in the first page
assert len(results.current_rows) == limit
# Finish reading all the pages and expect the full "limit":
assert list(range(limit)) == [x.c for x in cql.execute(f'SELECT c from {table1} WHERE p={p} LIMIT {limit}')]
# Check the combination of LIMIT and PER PARTITION LIMIT. It should only
# return the first PER PARTITION LIMIT rows from each partition, up to
# a total of LIMIT rows overall. Check with and without paging.
def test_limit_and_per_partition_limit(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "p int, c int, primary key (p,c)") as table:
stmt = cql.prepare(f'INSERT INTO {table} (p,c) VALUES (?, ?)')
# This test insert 9 items (p,c), 0<=p,c<3, and then reads them with
# PER PARTITION LIMIT 2 and different LIMITs and page sizes.
for i in range(3):
for j in range(3):
cql.execute(stmt, [i, j])
expected = [(x.p, x.c) for x in cql.execute(f'SELECT p,c from {table}')]
# check the results with PER PARTITION LIMIT 2, without a LIMIT:
expected_ppl2 = [e for i, e in enumerate(expected) if i % 3 != 2]
assert expected_ppl2 == [(x.p, x.c) for x in cql.execute(f'SELECT p,c from {table} PER PARTITION LIMIT 2')]
# Now check with different LIMITs:
for limit in [1,2,3,4,5,7,20]:
s = cql.prepare(f'SELECT p,c from {table} PER PARTITION LIMIT 2 LIMIT {limit}')
for page_size in [0, 1, 2]:
s.fetch_size = page_size
results = cql.execute(s)
if page_size > 0:
assert len(results.current_rows) == min(page_size, limit, len(expected_ppl2))
else:
# page_size=0 means use some large default, so first page will
# have all results
assert len(results.current_rows) == min(limit, len(expected_ppl2))
# Finish reading all the pages and expect the first "limit" rows of expected_ppl2
assert expected_ppl2[0:limit] == [(x.p, x.c) for x in results]
# Check the combination of LIMIT and IN. The LIMIT should constrain the total number
# of results, from either partition. Check with and without paging.
def test_limit_and_in(cql, table1):
p1 = unique_key_int()
p2 = unique_key_int()
stmt = cql.prepare(f'INSERT INTO {table1} (p, c) VALUES (?, ?)')
for i in range(5):
cql.execute(stmt, [p1,i])
cql.execute(stmt, [p2,i])
all_results = list(cql.execute(f'SELECT p,c FROM {table1} WHERE p IN ({p1},{p2})'))
for limit in [1,2,6,12]:
s = cql.prepare(f'SELECT p,c FROM {table1} WHERE p IN ({p1},{p2}) LIMIT {limit}')
for page_size in [0, 1, 2, 5, 6]:
s.fetch_size = page_size
results = cql.execute(s)
if page_size > 0:
assert len(results.current_rows) == min(page_size, limit, len(all_results))
else:
assert len(results.current_rows) == min(limit, len(all_results))
# Finish reading all the pages and expect the first "limit" rows of expected_ppl2
assert all_results[0:limit] == list(results)
# Check the combination of LIMIT and tombstones (deleted rows).
# LIMIT should always limit the number of actual results, so tombstones
# aren't counted (tombstones can cause a smaller *page* of results, but this
# isn't the same as LIMIT).
def test_limit_single_partition_scan_with_tombstones(cql, table1):
p = unique_key_int()
stmt = cql.prepare(f'INSERT INTO {table1} (p, c) VALUES ({p}, ?)')
dstmt = cql.prepare(f'DELETE FROM {table1} WHERE p={p} AND c=?')
for i in range(10):
cql.execute(stmt, [i])
if i%2 == 0:
cql.execute(dstmt, [i])
assert [1,3,5,7] == [x.c for x in cql.execute(f'SELECT c from {table1} WHERE p={p} LIMIT 4')]
# Check that LIMIT must be strictly positive.
def test_limit_must_be_positive(cql, table1):
stmt = cql.prepare(f'SELECT c from {table1} LIMIT ?')
with pytest.raises(InvalidRequest, match="LIMIT must be strictly positive"):
cql.execute(stmt, [0])
with pytest.raises(InvalidRequest, match="LIMIT must be strictly positive"):
cql.execute(stmt, [-1])
# Check that PER PARTITION LIMIT must be strictly positive.
def test_per_partition_limit_must_be_positive(cql, table1):
stmt = cql.prepare(f'SELECT c from {table1} PER PARTITION LIMIT ?')
with pytest.raises(InvalidRequest, match="PER PARTITION LIMIT must be strictly positive"):
cql.execute(stmt, [0])
with pytest.raises(InvalidRequest, match="PER PARTITION LIMIT must be strictly positive"):
cql.execute(stmt, [-1])
# Check that PER PARTITION LIMIT is not allowed with aggregate queries.
# Reproduces #9879
def test_limit_aggregate_queries(cql, table1):
with pytest.raises(InvalidRequest, match="PER PARTITION LIMIT is not allowed with aggregate queries."):
cql.execute(f"SELECT count(*) FROM {table1} PER PARTITION LIMIT 1")
with pytest.raises(InvalidRequest, match="PER PARTITION LIMIT is not allowed with aggregate queries."):
cql.execute(f"SELECT min(c) FROM {table1} PER PARTITION LIMIT 1")
with pytest.raises(InvalidRequest, match="PER PARTITION LIMIT is not allowed with aggregate queries."):
cql.execute(f"SELECT max(c) FROM {table1} PER PARTITION LIMIT 1")
with pytest.raises(InvalidRequest, match="PER PARTITION LIMIT is not allowed with aggregate queries."):
cql.execute(f"SELECT avg(c) FROM {table1} PER PARTITION LIMIT 1")
with pytest.raises(InvalidRequest, match="PER PARTITION LIMIT is not allowed with aggregate queries."):
cql.execute(f"SELECT sum(c) FROM {table1} PER PARTITION LIMIT 1")
# Test the PER PARTITION LIMIT with a UDA
# This aggregate function multiplies all the values together
# We first ensure that the aggregate works correctly for 9!
# Then we test that the PER PARTITION LIMIT is rejected
# Reproduces #9879
def test_limit_aggregate_queries_with_uda(scylla_only, cql, table1, test_keyspace):
myproduct_function_body = "(acc bigint, val int) RETURNS NULL ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return acc * val;'"
with new_function(cql, test_keyspace, myproduct_function_body) as myproduct_function:
aggregate_body = f"(int) SFUNC {myproduct_function} STYPE bigint INITCOND 1"
with new_aggregate(cql, test_keyspace, aggregate_body) as myproduct:
cql.execute(f"TRUNCATE TABLE {table1}")
stmt = cql.prepare(f"INSERT INTO {table1} (p, c) VALUES (?, ?)")
for i in range(1,10):
cql.execute(stmt, [i, i])
# assert that the product of all the values is 9!
result = cql.execute(f"SELECT {myproduct}(c) FROM {table1}")
assert 362880 == list(result)[0][0]
with pytest.raises(InvalidRequest, match="PER PARTITION LIMIT is not allowed with aggregate queries."):
cql.execute(f"SELECT {myproduct}(c) FROM {table1} PER PARTITION LIMIT 1")