secondary index: fix paging in map value indexing

When indexing a map column's *values*, if the same value appears more
than once, the same row will appear in the index more than once. We had
code that removed these duplicates, but this deduplication did not work
across page boundaries. We had two xfailing tests to demonstrate this bug.

In this patch we fix this bug by looking at the page's start and not
generating the same row again, thereby getting the same deduplication
we had inside pages - now across pages.

The previously-xfailing tests now pass, and their xfail tag is removed.
I also added another test, for the case where the base table has only
partition keys without clustering keys. This second test is important
because the code path for the partition-key-only case is different,
and the second test exposed a bug in it as well (which is also fixed
in this patch).

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
This commit is contained in:
Nadav Har'El
2022-07-26 17:18:44 +03:00
parent dc445b9a73
commit f6f18b187a
2 changed files with 109 additions and 5 deletions

View File

@@ -1300,6 +1300,12 @@ indexed_table_select_statement::find_index_partition_ranges(query_processor& qp,
// to avoid outputting the same partition key twice, but luckily in
// the sorted order, these will be adjacent.
std::optional<dht::decorated_key> last_dk;
if (options.get_paging_state()) {
auto paging_state = options.get_paging_state();
auto base_pk = generate_base_key_from_index_pk<partition_key>(paging_state->get_partition_key(),
paging_state->get_clustering_key(), *_schema, *_view_schema);
last_dk = dht::decorate_key(*_schema, base_pk);
}
for (size_t i = 0; i < rs.size(); i++) {
const auto& row = rs.at(i);
std::vector<bytes> pk_columns;
@@ -1338,6 +1344,22 @@ indexed_table_select_statement::find_index_clustering_rows(query_processor& qp,
primary_keys.reserve(rs.size());
std::optional<std::reference_wrapper<primary_key>> last_primary_key;
// Set last_primary_key if indexing map values and not in the first
// query page. See comment below why last_primary_key is needed for
// indexing map values. We have a test for this with paging:
// test_secondary_index.py::test_index_map_values_paging.
std::optional<primary_key> page_start_primary_key;
if (_index.target_type() == cql3::statements::index_target::target_type::collection_values &&
options.get_paging_state()) {
auto paging_state = options.get_paging_state();
auto base_pk = generate_base_key_from_index_pk<partition_key>(paging_state->get_partition_key(),
paging_state->get_clustering_key(), *_schema, *_view_schema);
auto base_dk = dht::decorate_key(*_schema, base_pk);
auto base_ck = generate_base_key_from_index_pk<clustering_key>(paging_state->get_partition_key(),
paging_state->get_clustering_key(), *_schema, *_view_schema);
page_start_primary_key = primary_key{std::move(base_dk), std::move(base_ck)};
last_primary_key = *page_start_primary_key;
}
for (size_t i = 0; i < rs.size(); i++) {
const auto& row = rs.at(i);
auto pk_columns = _schema->partition_key_columns() | boost::adaptors::transformed([&] (auto& cdef) {

View File

@@ -711,7 +711,6 @@ def test_index_map_values(cql, test_keyspace):
# There is a complication, that this de-duplication does not easily span
# *paging*. So the purpose of this test is to check that paging does not
# cause the same row to be returned more than once.
@pytest.mark.xfail(reason="duplicates in map value index + paging")
def test_index_map_values_paging(cql, test_keyspace):
schema = 'pk int, ck int, m map<int,int>, PRIMARY KEY (pk, ck)'
with new_test_table(cql, test_keyspace, schema) as table:
@@ -731,8 +730,7 @@ def test_index_map_values_paging(cql, test_keyspace):
# In the previous test (test_index_map_values*) all tests involved a single
# row, which could match a search, or not. In this test we verify that the
# case of multiple matching rows also works.
@pytest.mark.xfail(reason="duplicates in map value index + paging")
def test_index_map_values_multiple_matching_rows(cql, test_keyspace):
def test_index_map_values_multiple_matching_rows(cql, test_keyspace, driver_bug_1):
schema = 'pk int, ck int, m map<int,int>, PRIMARY KEY (pk, ck)'
with new_test_table(cql, test_keyspace, schema) as table:
# index m (same as values(m)). Will allow "CONTAINS".
@@ -741,15 +739,47 @@ def test_index_map_values_multiple_matching_rows(cql, test_keyspace):
# the value 3 in them somewhere, others don't. One of the maps has
# multiple occurances of the value 3, so we also reproduce here the
# same bug that test_index_map_values_paging() reproduces.
# Note: Scylla needs to skip a duplicate 3 value in (2,4) which
# results in an empty page in the result set when page_size=1. We
# need the driver to correctly support this, hence the "driver_bug_1".
cql.execute(f'INSERT INTO {table} (pk, ck, m) VALUES (1, 2, {{1:2, 3:4}})')
cql.execute(f'INSERT INTO {table} (pk, ck, m) VALUES (1, 3, {{1:3, 3:4}})')
cql.execute(f'INSERT INTO {table} (pk, ck, m) VALUES (1, 4, {{7:3}})')
cql.execute(f'INSERT INTO {table} (pk, ck, m) VALUES (2, 2, {{1:3, 2:3, 3:4}})')
cql.execute(f'INSERT INTO {table} (pk, ck, m) VALUES (2, 4, {{}})')
assert [(1,3),(1,4),(2,2)] == list(cql.execute(f'SELECT pk,ck FROM {table} WHERE m CONTAINS 3'))
cql.execute(f'INSERT INTO {table} (pk, ck, m) VALUES (2, 5, {{7:3}})')
assert [(1,3),(1,4),(2,2),(2,5)] == list(cql.execute(f'SELECT pk,ck FROM {table} WHERE m CONTAINS 3'))
for page_size in [1, 2, 3, 7]:
stmt = SimpleStatement(f'SELECT pk,ck FROM {table} WHERE m CONTAINS 3', fetch_size=page_size)
assert [(1,3),(1,4),(2,2)] == list(cql.execute(stmt))
assert [(1,3),(1,4),(2,2),(2,5)] == list(cql.execute(stmt))
# In the previous tests (test_index_map_values*) all tests involved a base
# table with both partition keys and clustering keys. Because some of the
# implementation is different depending the schema has clustering keys,
# let's also write a similar test with just a partition key:
def test_index_map_values_partition_key_only(cql, test_keyspace, driver_bug_1):
schema = 'pk int, m map<int,int>, PRIMARY KEY (pk)'
with new_test_table(cql, test_keyspace, schema) as table:
# index m (same as values(m)). Will allow "CONTAINS".
cql.execute(f'CREATE INDEX ON {table}(m)')
# Insert several rows with several different maps, some of them have
# the value 3 in them somewhere, others don't. One of the maps has
# multiple occurances of the value 3, so we also reproduce here the
# same bug that test_index_map_values_paging() reproduces (and here
# test its intersection with the case of no clustering key).
cql.execute(f'INSERT INTO {table} (pk, m) VALUES (1, {{1:2, 3:4}})')
cql.execute(f'INSERT INTO {table} (pk, m) VALUES (2, {{1:3, 3:4}})')
cql.execute(f'INSERT INTO {table} (pk, m) VALUES (3, {{7:3}})')
cql.execute(f'INSERT INTO {table} (pk, m) VALUES (4, {{1:3, 2:3, 3:4}})')
cql.execute(f'INSERT INTO {table} (pk, m) VALUES (5, {{}})')
assert [(2,), (3,), (4,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE m CONTAINS 3'))
for page_size in [1, 2, 3, 7]:
stmt = SimpleStatement(f'SELECT pk FROM {table} WHERE m CONTAINS 3', fetch_size=page_size)
assert [(2,), (3,), (4,)] == sorted(cql.execute(stmt))
# I wanted to check here that page_size is actually obeyed,
# but we can't - when Scylla skips one of the duplicate values
# it can result in a smaller page, and while not great (Cassandra
# doesn't do it, all its pages are full size), it's legal.
def test_index_map_keys(cql, test_keyspace):
schema = 'pk int, ck int, m map<int,int>, PRIMARY KEY (pk, ck)'
@@ -1096,3 +1126,55 @@ def test_secondary_collection_index(cql, test_keyspace):
for tab in [tab1, tab2]:
op(tab, **args)
test_all_possible_selects()
# Test that paging through a select using a secondary index works as
# expected, returning pages of the requested size.
# We have several tests here, for different schemas, that exercises
# different code paths and may expose different bugs.
def test_index_paging_pk_ck(cql, test_keyspace):
schema = 'p int, c int, x int, primary key (p,c)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE INDEX ON {table}(x)")
insert = cql.prepare(f"INSERT INTO {table}(p,c,x) VALUES (?,?,?)")
for i in range(10):
cql.execute(insert, [i, i, 3])
cql.execute(insert, [17, 17, 2])
for page_size in [1, 2, 3, 100]:
stmt = SimpleStatement(f"SELECT p FROM {table} WHERE x = 3", fetch_size=page_size)
# Check that:
# 1. Each page of results has the expected page_size, or less in
# the last page. Although partial pages are theoretically
# allowed (and happen in other tests), in this test we don't
# expect Scylla or Cassandra to generate them.
# 2. Check that all the results read over all pages are the
# expected ones (0...9)
all_rows = []
results = cql.execute(stmt)
while len(results.current_rows) == page_size:
all_rows.extend(results.current_rows)
results = cql.execute(stmt, paging_state=results.paging_state)
# After pages of page_size, the last page should be partial
assert len(results.current_rows) < page_size
all_rows.extend(results.current_rows)
# Finally check that altogether, we read the right rows.
assert sorted(all_rows) == [(i,) for i in range(10)]
def test_index_paging_pk_only(cql, test_keyspace):
schema = 'p int, x int, primary key (p)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE INDEX ON {table}(x)")
insert = cql.prepare(f"INSERT INTO {table}(p,x) VALUES (?,?)")
for i in range(10):
cql.execute(insert, [i, 3])
cql.execute(insert, [17, 2])
for page_size in [1, 2, 3, 100]:
stmt = SimpleStatement(f"SELECT p FROM {table} WHERE x = 3", fetch_size=page_size)
all_rows = []
results = cql.execute(stmt)
while len(results.current_rows) == page_size:
all_rows.extend(results.current_rows)
results = cql.execute(stmt, paging_state=results.paging_state)
assert len(results.current_rows) < page_size
all_rows.extend(results.current_rows)
assert sorted(all_rows) == [(i,) for i in range(10)]